Channels
Azu provides first-class WebSocket support through channels, which handle persistent connections, message routing, and broadcasting. Channels enable real-time bidirectional communication between clients and servers.
Overview
WebSocket channels provide:
Persistent connections for real-time communication
Message routing and event handling
Connection management and lifecycle events
Broadcasting to multiple clients
Type-safe message handling with Crystal's type system
Architecture
Basic Channel Structure
Note: WebSocket channels use a message-based protocol for room identification. Clients should send a
joinmessage with the room ID after connecting, rather than using path parameters.
Simple Chat Channel
class ChatChannel < Azu::Channel
# Define the WebSocket route
ws "/chat"
# Connection tracking - maps room_id to set of sockets
@@connections = Hash(String, Set(HTTP::WebSocket)).new { |h, k| h[k] = Set(HTTP::WebSocket).new }
# Track which room each socket belongs to
@@socket_rooms = Hash(HTTP::WebSocket, String).new
def on_connect
# Send welcome message - client must send a "join" message with room_id
send_message({
type: "system",
message: "Connected. Send a 'join' message with room_id to join a room.",
timestamp: Time.utc.to_rfc3339
})
end
def on_message(message : String)
begin
data = JSON.parse(message)
handle_message_type(data)
rescue JSON::ParseException
send_error("Invalid JSON format")
rescue ex
Log.error(exception: ex) { "Error processing message" }
send_error("Internal server error")
end
end
def on_close(code : HTTP::WebSocket::CloseCode?, message : String?)
# Find and remove from room
if room_id = @@socket_rooms[socket]?
@@connections[room_id].delete(socket)
@@socket_rooms.delete(socket)
# Notify remaining users
if @@connections[room_id].any?
broadcast_to_room(room_id, {
type: "user_left",
message: "A user left the room",
count: @@connections[room_id].size
})
end
Log.info { "User disconnected from room #{room_id}. Remaining: #{@@connections[room_id].size}" }
end
end
private def handle_message_type(data)
case data["type"]?.try(&.as_s)
when "join"
handle_join(data)
when "chat_message"
handle_chat_message(data)
when "typing_start"
handle_typing_indicator(data, true)
when "typing_stop"
handle_typing_indicator(data, false)
when "ping"
send_message({type: "pong", timestamp: Time.utc.to_rfc3339})
else
send_error("Unknown message type: #{data["type"]?}")
end
end
private def handle_join(data)
room_id = data["room_id"]?.try(&.as_s)
return send_error("room_id is required") unless room_id
# Track socket's room
@@socket_rooms[socket] = room_id
@@connections[room_id] << socket
# Send confirmation
send_message({
type: "joined",
room_id: room_id,
message: "Welcome to room #{room_id}",
timestamp: Time.utc.to_rfc3339
})
# Notify others
broadcast_to_room(room_id, {
type: "user_joined",
message: "A user joined the room",
count: @@connections[room_id].size
}, exclude: socket)
Log.info { "User joined room #{room_id}. Total: #{@@connections[room_id].size}" }
end
private def handle_chat_message(data)
room_id = @@socket_rooms[socket]?
return send_error("Not in a room. Send a 'join' message first.") unless room_id
message = data["message"]?.try(&.as_s)
return send_error("Message is required") unless message
# Validate message
return send_error("Message too long") if message.size > 1000
return send_error("Message cannot be empty") if message.strip.empty?
# Broadcast to all users in room
broadcast_to_room(room_id, {
type: "chat_message",
message: message,
timestamp: Time.utc.to_rfc3339,
user_id: current_user_id
})
end
private def handle_typing_indicator(data, is_typing : Bool)
room_id = @@socket_rooms[socket]?
return send_error("Not in a room") unless room_id
broadcast_to_room(room_id, {
type: is_typing ? "user_typing" : "user_stopped_typing",
user_id: current_user_id,
timestamp: Time.utc.to_rfc3339
}, exclude: socket)
end
private def broadcast_to_room(room_id : String, data, exclude : HTTP::WebSocket? = nil)
message = data.to_json
@@connections[room_id].each do |connection|
next if connection == exclude
spawn { connection.send(message) }
end
end
private def send_message(data)
socket.send(data.to_json)
end
private def send_error(message : String)
send_message({
type: "error",
message: message,
timestamp: Time.utc.to_rfc3339
})
end
private def current_user_id
# Extract user ID from authentication token or session
"user_#{socket.object_id}"
end
endChannel Lifecycle
Connection Events
Message Handling
Type-Safe Message Processing
Broadcasting
Room-Based Broadcasting
User-Based Broadcasting
Authentication and Authorization
Token-Based Authentication
Note: For authentication, pass the token as a query parameter in the WebSocket URL (e.g.,
ws://host/secure?token=xxx). Resource access is managed via messages after connection.
Error Handling
Comprehensive Error Handling
Performance Optimization
Connection Pooling
Message Batching
Testing Channels
Channel Testing
Best Practices
1. Connection Management
Implement proper connection cleanup
Use connection pools for external services
Monitor connection health
Handle reconnection gracefully
2. Message Handling
Validate all incoming messages
Use type-safe message structures
Implement proper error handling
Log important events
3. Performance
Use connection pooling
Implement message batching
Monitor memory usage
Optimize broadcasting
4. Security
Authenticate all connections
Validate message content
Implement rate limiting
Use secure WebSocket connections
Next Steps
Live Components - Build real-time UI components
Spark System - Client-side real-time updates
Channel Examples - Working examples
Ready to build real-time features? Start with the basic channel examples above, then explore Live Components for interactive UI components.
Last updated
Was this helpful?
