Channels

Azu provides first-class WebSocket support through channels, which handle persistent connections, message routing, and broadcasting. Channels enable real-time bidirectional communication between clients and servers.

Overview

WebSocket channels provide:

  • Persistent connections for real-time communication

  • Message routing and event handling

  • Connection management and lifecycle events

  • Broadcasting to multiple clients

  • Type-safe message handling with Crystal's type system

Architecture

spinner

Basic Channel Structure

Note: WebSocket channels use a message-based protocol for room identification. Clients should send a join message with the room ID after connecting, rather than using path parameters.

Simple Chat Channel

class ChatChannel < Azu::Channel
  # Define the WebSocket route
  ws "/chat"

  # Connection tracking - maps room_id to set of sockets
  @@connections = Hash(String, Set(HTTP::WebSocket)).new { |h, k| h[k] = Set(HTTP::WebSocket).new }
  # Track which room each socket belongs to
  @@socket_rooms = Hash(HTTP::WebSocket, String).new

  def on_connect
    # Send welcome message - client must send a "join" message with room_id
    send_message({
      type: "system",
      message: "Connected. Send a 'join' message with room_id to join a room.",
      timestamp: Time.utc.to_rfc3339
    })
  end

  def on_message(message : String)
    begin
      data = JSON.parse(message)
      handle_message_type(data)
    rescue JSON::ParseException
      send_error("Invalid JSON format")
    rescue ex
      Log.error(exception: ex) { "Error processing message" }
      send_error("Internal server error")
    end
  end

  def on_close(code : HTTP::WebSocket::CloseCode?, message : String?)
    # Find and remove from room
    if room_id = @@socket_rooms[socket]?
      @@connections[room_id].delete(socket)
      @@socket_rooms.delete(socket)

      # Notify remaining users
      if @@connections[room_id].any?
        broadcast_to_room(room_id, {
          type: "user_left",
          message: "A user left the room",
          count: @@connections[room_id].size
        })
      end

      Log.info { "User disconnected from room #{room_id}. Remaining: #{@@connections[room_id].size}" }
    end
  end

  private def handle_message_type(data)
    case data["type"]?.try(&.as_s)
    when "join"
      handle_join(data)
    when "chat_message"
      handle_chat_message(data)
    when "typing_start"
      handle_typing_indicator(data, true)
    when "typing_stop"
      handle_typing_indicator(data, false)
    when "ping"
      send_message({type: "pong", timestamp: Time.utc.to_rfc3339})
    else
      send_error("Unknown message type: #{data["type"]?}")
    end
  end

  private def handle_join(data)
    room_id = data["room_id"]?.try(&.as_s)
    return send_error("room_id is required") unless room_id

    # Track socket's room
    @@socket_rooms[socket] = room_id
    @@connections[room_id] << socket

    # Send confirmation
    send_message({
      type: "joined",
      room_id: room_id,
      message: "Welcome to room #{room_id}",
      timestamp: Time.utc.to_rfc3339
    })

    # Notify others
    broadcast_to_room(room_id, {
      type: "user_joined",
      message: "A user joined the room",
      count: @@connections[room_id].size
    }, exclude: socket)

    Log.info { "User joined room #{room_id}. Total: #{@@connections[room_id].size}" }
  end

  private def handle_chat_message(data)
    room_id = @@socket_rooms[socket]?
    return send_error("Not in a room. Send a 'join' message first.") unless room_id

    message = data["message"]?.try(&.as_s)
    return send_error("Message is required") unless message

    # Validate message
    return send_error("Message too long") if message.size > 1000
    return send_error("Message cannot be empty") if message.strip.empty?

    # Broadcast to all users in room
    broadcast_to_room(room_id, {
      type: "chat_message",
      message: message,
      timestamp: Time.utc.to_rfc3339,
      user_id: current_user_id
    })
  end

  private def handle_typing_indicator(data, is_typing : Bool)
    room_id = @@socket_rooms[socket]?
    return send_error("Not in a room") unless room_id

    broadcast_to_room(room_id, {
      type: is_typing ? "user_typing" : "user_stopped_typing",
      user_id: current_user_id,
      timestamp: Time.utc.to_rfc3339
    }, exclude: socket)
  end

  private def broadcast_to_room(room_id : String, data, exclude : HTTP::WebSocket? = nil)
    message = data.to_json
    @@connections[room_id].each do |connection|
      next if connection == exclude
      spawn { connection.send(message) }
    end
  end

  private def send_message(data)
    socket.send(data.to_json)
  end

  private def send_error(message : String)
    send_message({
      type: "error",
      message: message,
      timestamp: Time.utc.to_rfc3339
    })
  end

  private def current_user_id
    # Extract user ID from authentication token or session
    "user_#{socket.object_id}"
  end
end

Channel Lifecycle

Connection Events

Message Handling

Type-Safe Message Processing

Broadcasting

Room-Based Broadcasting

User-Based Broadcasting

Authentication and Authorization

Token-Based Authentication

Note: For authentication, pass the token as a query parameter in the WebSocket URL (e.g., ws://host/secure?token=xxx). Resource access is managed via messages after connection.

Error Handling

Comprehensive Error Handling

Performance Optimization

Connection Pooling

Message Batching

Testing Channels

Channel Testing

Best Practices

1. Connection Management

  • Implement proper connection cleanup

  • Use connection pools for external services

  • Monitor connection health

  • Handle reconnection gracefully

2. Message Handling

  • Validate all incoming messages

  • Use type-safe message structures

  • Implement proper error handling

  • Log important events

3. Performance

  • Use connection pooling

  • Implement message batching

  • Monitor memory usage

  • Optimize broadcasting

4. Security

  • Authenticate all connections

  • Validate message content

  • Implement rate limiting

  • Use secure WebSocket connections

Next Steps


Ready to build real-time features? Start with the basic channel examples above, then explore Live Components for interactive UI components.

Last updated

Was this helpful?