Scaling Patterns

Comprehensive guide to scaling Azu applications from single instances to distributed systems.

Overview

Azu applications can scale both vertically (single instance) and horizontally (multiple instances). This guide covers patterns and strategies for building scalable applications that can handle increasing load.

Vertical Scaling

Single Instance Optimization

# Vertical scaling configuration
CONFIG.vertical_scaling = {
  # CPU optimization
  workers: System.cpu_count,
  thread_pool_size: System.cpu_count * 2,

  # Memory optimization
  heap_size: "2G",
  gc_interval: 100,

  # I/O optimization
  backlog: 2048,
  tcp_nodelay: true
}

Resource Optimization

# Resource-optimized endpoint
struct ResourceOptimizedEndpoint
  include Endpoint(ResourceRequest, ResourceResponse)

  def call : ResourceResponse
    # Use connection pooling
    connection = connection_pool.checkout

    begin
      result = process_with_connection(connection, @request)
      ResourceResponse.new(result)
    ensure
      connection_pool.checkin(connection)
    end
  end
end

Horizontal Scaling

Load Balancing

# Load balancer configuration
CONFIG.load_balancer = {
  algorithm: "round_robin", # or "least_connections", "ip_hash"
  health_check_interval: 30.seconds,
  health_check_path: "/health",
  session_sticky: true
}

# Health check endpoint
struct HealthCheckEndpoint
  include Endpoint(HealthRequest, HealthResponse)

  get "/health"

  def call : HealthResponse
    # Check application health
    health_status = {
      status: "healthy",
      timestamp: Time.utc,
      uptime: System.uptime,
      memory_usage: GC.stats.total_allocated,
      active_connections: connection_count
    }

    HealthResponse.new(health_status)
  end
end

Session Management

# Distributed session management
class DistributedSession
  include Session::Store

  def initialize(@redis_client : Redis::Client)
  end

  def get(session_id : String) : Session::Data?
    data = @redis_client.get("session:#{session_id}")
    Session::Data.from_json(data) if data
  end

  def set(session_id : String, data : Session::Data, ttl : Time::Span = 1.hour)
    @redis_client.setex("session:#{session_id}", ttl.total_seconds.to_i, data.to_json)
  end

  def delete(session_id : String)
    @redis_client.del("session:#{session_id}")
  end
end

Database Scaling

Read Replicas

# Read replica configuration
CONFIG.database = {
  primary: {
    url: "postgresql://primary:5432/app",
    pool_size: 10
  },
  replicas: [
    {
      url: "postgresql://replica1:5432/app",
      pool_size: 5
    },
    {
      url: "postgresql://replica2:5432/app",
      pool_size: 5
    }
  ]
}

# Read replica routing
class ReadReplicaRouter
  @replicas = [] of Database::Connection
  @current_replica = 0

  def get_read_connection : Database::Connection
    @current_replica = (@current_replica + 1) % @replicas.size
    @replicas[@current_replica]
  end
end

Database Sharding

# Database sharding strategy
class ShardedDatabase
  @shards = {} of Int32 => Database::Connection

  def initialize
    # Initialize shard connections
    CONFIG.database.shards.each do |shard_id, config|
      @shards[shard_id] = Database::Connection.new(config.url)
    end
  end

  def get_shard(user_id : Int32) : Database::Connection
    shard_id = user_id % @shards.size
    @shards[shard_id]
  end
end

# Sharded endpoint
struct ShardedEndpoint
  include Endpoint(ShardedRequest, ShardedResponse)

  def call : ShardedResponse
    user_id = @request.user_id
    shard = sharded_database.get_shard(user_id)

    # Use appropriate shard for user data
    user_data = shard.query_one("SELECT * FROM users WHERE id = ?", user_id)

    ShardedResponse.new(user_data)
  end
end

Caching Strategies

Distributed Caching

# Redis-based distributed cache
class DistributedCache
  @redis = Redis::Client.new(CONFIG.redis_url)

  def get(key : String) : String?
    @redis.get(key)
  end

  def set(key : String, value : String, ttl : Time::Span = 1.hour)
    @redis.setex(key, ttl.total_seconds.to_i, value)
  end

  def invalidate(pattern : String)
    keys = @redis.keys(pattern)
    @redis.del(keys) if keys.any?
  end
end

# Cached endpoint with distributed cache
struct DistributedCachedEndpoint
  include Endpoint(CacheRequest, CacheResponse)

  def call : CacheResponse
    cache_key = "user:#{@request.user_id}"

    if cached = distributed_cache.get(cache_key)
      return CacheResponse.from_cache(cached)
    end

    # Generate fresh data
    user_data = fetch_user_data(@request.user_id)

    # Cache in distributed store
    distributed_cache.set(cache_key, user_data.to_json)

    CacheResponse.new(user_data)
  end
end

Cache Invalidation

# Cache invalidation strategy
class CacheInvalidator
  @redis = Redis::Client.new(CONFIG.redis_url)

  def invalidate_user(user_id : Int32)
    # Invalidate user-specific cache
    @redis.del("user:#{user_id}")
    @redis.del("user_posts:#{user_id}")
    @redis.del("user_profile:#{user_id}")
  end

  def invalidate_pattern(pattern : String)
    keys = @redis.keys(pattern)
    @redis.del(keys) if keys.any?
  end
end

Message Queues

Background Job Processing

# Message queue integration
class JobProcessor
  @queue = Redis::Client.new(CONFIG.redis_url)

  def enqueue(job_type : String, data : Hash)
    job = {
      id: generate_job_id,
      type: job_type,
      data: data,
      created_at: Time.utc
    }

    @queue.lpush("jobs:#{job_type}", job.to_json)
  end

  def process_jobs
    spawn do
      loop do
        if job_data = @queue.brpop("jobs:email", timeout: 1)
          job = Job.from_json(job_data[1])
          process_job(job)
        end
      end
    end
  end
end

# Job processing endpoint
struct JobEndpoint
  include Endpoint(JobRequest, JobResponse)

  def call : JobResponse
    # Enqueue background job
    job_processor.enqueue("email", {
      to: @request.email,
      subject: @request.subject,
      body: @request.body
    })

    JobResponse.new(job_id: generate_job_id)
  end
end

Microservices Architecture

Service Discovery

# Service discovery with Consul
class ServiceRegistry
  @consul = Consul::Client.new(CONFIG.consul_url)

  def register_service(name : String, address : String, port : Int32)
    @consul.agent.service.register(
      name: name,
      address: address,
      port: port,
      check: {
        http: "http://#{address}:#{port}/health",
        interval: "30s"
      }
    )
  end

  def discover_service(name : String) : Array(ServiceInstance)
    services = @consul.catalog.service(name)
    services.map { |s| ServiceInstance.new(s.address, s.port) }
  end
end

API Gateway

# API gateway for microservices
struct ApiGatewayEndpoint
  include Endpoint(GatewayRequest, GatewayResponse)

  post "/api/*"

  def call : GatewayResponse
    # Route to appropriate microservice
    service_name = extract_service_name(@request.path)
    service_instances = service_registry.discover_service(service_name)

    # Load balance between instances
    instance = load_balancer.select(service_instances)

    # Forward request
    response = forward_request(instance, @request)

    GatewayResponse.new(response)
  end
end

Container Orchestration

Docker Configuration

# Dockerfile for Azu application
FROM crystallang/crystal:1.16-alpine

WORKDIR /app

# Install dependencies
COPY shard.yml shard.lock ./
RUN shards install

# Copy source code
COPY . .

# Build application
RUN crystal build --release src/app.cr

# Expose port
EXPOSE 3000

# Run application
CMD ["./app"]

Kubernetes Deployment

# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: azu-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: azu-app
  template:
    metadata:
      labels:
        app: azu-app
    spec:
      containers:
        - name: azu-app
          image: azu-app:latest
          ports:
            - containerPort: 3000
          env:
            - name: ENVIRONMENT
              value: "production"
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: url
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "500m"
          livenessProbe:
            httpGet:
              path: /health
              port: 3000
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /health
              port: 3000
            initialDelaySeconds: 5
            periodSeconds: 5

Monitoring and Observability

Distributed Tracing

# Distributed tracing with OpenTelemetry
class TracingMiddleware
  include Handler

  def call(request, response)
    # Create trace span
    span = tracer.start_span("http_request", {
      "http.method" => request.method,
      "http.url" => request.path,
      "http.user_agent" => request.headers["User-Agent"]?
    })

    begin
      result = @next.call(request, response)

      # Add response metadata
      span.set_attribute("http.status_code", response.status_code)

      result
    rescue ex
      # Record error
      span.record_exception(ex)
      raise ex
    ensure
      span.end
    end
  end
end

Metrics Collection

# Prometheus metrics
class MetricsCollector
  @request_counter = Prometheus::Counter.new(
    name: "http_requests_total",
    help: "Total number of HTTP requests"
  )

  @request_duration = Prometheus::Histogram.new(
    name: "http_request_duration_seconds",
    help: "HTTP request duration in seconds"
  )

  def record_request(method : String, path : String, status : Int32, duration : Time::Span)
    @request_counter.increment(labels: {method: method, path: path, status: status.to_s})
    @request_duration.observe(duration.total_seconds, labels: {method: method, path: path})
  end
end

Auto-scaling

Horizontal Pod Autoscaler

# kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: azu-app-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: azu-app
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

Custom Metrics

# Custom metrics for auto-scaling
class CustomMetrics
  @active_connections = Atomic(Int32).new(0)
  @request_queue_size = Atomic(Int32).new(0)

  def increment_connections
    @active_connections.add(1)
  end

  def decrement_connections
    @active_connections.sub(1)
  end

  def get_connection_count : Int32
    @active_connections.get
  end

  def get_queue_size : Int32
    @request_queue_size.get
  end
end

# Metrics endpoint for HPA
struct MetricsEndpoint
  include Endpoint(MetricsRequest, MetricsResponse)

  get "/metrics"

  def call : MetricsResponse
    metrics = {
      active_connections: custom_metrics.get_connection_count,
      request_queue_size: custom_metrics.get_queue_size,
      memory_usage: GC.stats.total_allocated,
      cpu_usage: get_cpu_usage
    }

    MetricsResponse.new(metrics)
  end
end

Best Practices

1. Start Simple

# Start with vertical scaling
CONFIG.initial_scaling = {
  # Optimize single instance first
  workers: System.cpu_count,
  memory_limit: "1G",

  # Add monitoring
  enable_metrics: true,
  enable_tracing: true
}

2. Monitor and Measure

# Comprehensive monitoring
class ScalingMonitor
  def should_scale_horizontally? : Bool
    cpu_usage > 70 && memory_usage > 80 && response_time > 500
  end

  def get_optimal_replica_count : Int32
    current_load = get_current_load
    target_load = 50 # 50% target utilization

    (current_load / target_load).ceil.to_i32
  end
end

3. Plan for Failure

# Circuit breaker pattern
class CircuitBreaker
  @failure_count = Atomic(Int32).new(0)
  @last_failure_time = Atomic(Time?).new(nil)
  @state = Atomic(Symbol).new(:closed)

  def call(&block)
    case @state.get
    when :open
      raise CircuitBreakerOpenError.new
    when :half_open
      # Allow limited requests
      if @failure_count.get < 3
        execute_with_fallback(&block)
      else
        @state.set(:open)
        raise CircuitBreakerOpenError.new
      end
    when :closed
      execute_with_fallback(&block)
    end
  end
end

Next Steps


Remember: Scale horizontally when you can't scale vertically anymore, and always monitor your scaling decisions.

Last updated

Was this helpful?