WebSockets
Azu provides powerful WebSocket support for building real-time applications. With type-safe channels, automatic connection management, and efficient message handling, you can create interactive, responsive user experiences.
What are WebSockets?
WebSockets in Azu provide:
Real-time Communication: Bidirectional communication between client and server
Type Safety: Type-safe message handling and broadcasting
Connection Management: Automatic connection and disconnection handling
Event Broadcasting: Notify all connected clients of changes
Message Validation: Validate incoming WebSocket messages
Basic WebSocket Channel
class ChatChannel < Azu::Channel
CONNECTIONS = Set(HTTP::WebSocket).new
ws "/chat"
def on_connect
CONNECTIONS << socket.not_nil!
send_to_client({
type: "connected",
message: "Connected to chat",
timestamp: Time.utc.to_rfc3339
})
Log.info { "User connected. Total connections: #{CONNECTIONS.size}" }
end
def on_message(message : String)
begin
data = JSON.parse(message)
handle_message(data)
rescue JSON::ParseException
send_error("Invalid JSON format")
end
end
def on_close(code, message)
CONNECTIONS.delete(socket)
Log.info { "User disconnected. Total connections: #{CONNECTIONS.size}" }
end
private def handle_message(data : JSON::Any)
case data["type"]?.try(&.as_s)
when "ping"
send_to_client({type: "pong", timestamp: Time.utc.to_rfc3339})
when "message"
broadcast_message(data["content"].as_s, data["user"].as_s)
when "typing"
broadcast_typing(data["user"].as_s, data["is_typing"].as_bool)
else
send_error("Unknown message type")
end
end
private def broadcast_message(content : String, user : String)
message = {
type: "message",
content: content,
user: user,
timestamp: Time.utc.to_rfc3339
}
broadcast_to_all(message)
end
private def broadcast_typing(user : String, is_typing : Bool)
message = {
type: "typing",
user: user,
is_typing: is_typing,
timestamp: Time.utc.to_rfc3339
}
broadcast_to_all(message)
end
private def send_to_client(data)
socket.not_nil!.send(data.to_json)
end
private def send_error(message : String)
send_to_client({type: "error", message: message})
end
private def broadcast_to_all(message)
CONNECTIONS.each do |socket|
spawn socket.send(message.to_json)
end
end
endChannel Registration
Register WebSocket channels in your application:
module MyApp
include Azu
configure do |config|
# WebSocket configuration
config.websocket.ping_interval = 30.seconds
config.websocket.ping_timeout = 10.seconds
config.websocket.max_message_size = 1024 * 1024 # 1MB
end
# Register channels
router do
# WebSocket routes
ws "/chat", ChatChannel
ws "/notifications", NotificationChannel
ws "/game/:room_id", GameChannel
end
endMessage Types
Handle different types of WebSocket messages:
Text Messages
def on_message(message : String)
# Handle text message
send_to_client("Echo: #{message}")
endJSON Messages
def on_message(message : String)
begin
data = JSON.parse(message)
handle_json_message(data)
rescue JSON::ParseException
send_error("Invalid JSON format")
end
end
private def handle_json_message(data : JSON::Any)
case data["type"]?.try(&.as_s)
when "ping"
send_pong
when "chat"
handle_chat_message(data)
when "typing"
handle_typing(data)
else
send_error("Unknown message type")
end
endBinary Messages
def on_binary_message(message : Bytes)
# Handle binary message
send_binary_response(message)
end
private def send_binary_response(data : Bytes)
socket.not_nil!.send(data)
endConnection Management
Connection Lifecycle
class ConnectionChannel < Azu::Channel
ws "/connection"
def on_connect
# Connection established
Log.info { "WebSocket connection established" }
# Send welcome message
send_to_client({
type: "welcome",
message: "Connected successfully",
timestamp: Time.utc.to_rfc3339
})
end
def on_close(code, message)
# Connection closed
Log.info { "WebSocket connection closed: #{code} - #{message}" }
# Clean up resources
cleanup_connection
end
private def cleanup_connection
# Remove from active connections
# Clean up user session
# Notify other users
end
endConnection Validation
class AuthenticatedChannel < Azu::Channel
ws "/authenticated"
def on_connect
# Validate authentication
unless authenticated?
close_connection(1008, "Authentication required")
return
end
# Proceed with connection
send_to_client({type: "authenticated", user: current_user.to_json})
end
private def authenticated? : Bool
# Check authentication token
token = context.request.headers["Authorization"]?
return false unless token
# Validate token
validate_token(token)
end
private def current_user
# Get current user from token
decode_token(get_auth_token)
end
endBroadcasting
Broadcast to All Connections
class BroadcastChannel < Azu::Channel
CONNECTIONS = Set(HTTP::WebSocket).new
ws "/broadcast"
def on_connect
CONNECTIONS << socket.not_nil!
end
def on_close(code, message)
CONNECTIONS.delete(socket)
end
# Broadcast to all connected clients
def self.broadcast_to_all(message)
CONNECTIONS.each do |socket|
spawn socket.send(message.to_json)
end
end
# Broadcast to specific users
def self.broadcast_to_users(user_ids : Array(Int64), message)
user_ids.each do |user_id|
if socket = get_socket_for_user(user_id)
socket.send(message.to_json)
end
end
end
endRoom-based Broadcasting
class RoomChannel < Azu::Channel
ROOMS = {} of String => Set(HTTP::WebSocket)
ws "/room/:room_id"
def on_connect
room_id = params["room_id"]
ROOMS[room_id] ||= Set(HTTP::WebSocket).new
ROOMS[room_id] << socket.not_nil!
# Notify room of new user
broadcast_to_room(room_id, {
type: "user_joined",
user: current_user.to_json,
timestamp: Time.utc.to_rfc3339
})
end
def on_close(code, message)
room_id = params["room_id"]
ROOMS[room_id]?.delete(socket)
# Notify room of user leaving
broadcast_to_room(room_id, {
type: "user_left",
user: current_user.to_json,
timestamp: Time.utc.to_rfc3339
})
end
def on_message(message : String)
room_id = params["room_id"]
data = JSON.parse(message)
# Broadcast message to room
broadcast_to_room(room_id, {
type: "message",
content: data["content"].as_s,
user: current_user.to_json,
timestamp: Time.utc.to_rfc3339
})
end
private def broadcast_to_room(room_id : String, message)
ROOMS[room_id]?.each do |socket|
spawn socket.send(message.to_json)
end
end
endReal-time Features
Live Notifications
class NotificationChannel < Azu::Channel
USER_CONNECTIONS = {} of Int64 => HTTP::WebSocket
ws "/notifications"
def on_connect
user_id = current_user.id
USER_CONNECTIONS[user_id] = socket.not_nil!
# Send pending notifications
send_pending_notifications(user_id)
end
def on_close(code, message)
user_id = current_user.id
USER_CONNECTIONS.delete(user_id)
end
# Send notification to specific user
def self.notify_user(user_id : Int64, notification)
if socket = USER_CONNECTIONS[user_id]?
socket.send(notification.to_json)
else
# Store for later delivery
store_notification(user_id, notification)
end
end
# Broadcast system notification
def self.broadcast_system_notification(notification)
USER_CONNECTIONS.each do |user_id, socket|
spawn socket.send(notification.to_json)
end
end
endLive Updates
class LiveUpdateChannel < Azu::Channel
ws "/live_updates"
def on_connect
# Subscribe to specific updates
subscribe_to_updates
end
def on_message(message : String)
data = JSON.parse(message)
case data["type"]?.try(&.as_s)
when "subscribe"
subscribe_to_resource(data["resource"].as_s, data["id"].as_i64)
when "unsubscribe"
unsubscribe_from_resource(data["resource"].as_s, data["id"].as_i64)
end
end
private def subscribe_to_updates
# Subscribe to user-specific updates
user_id = current_user.id
subscribe_to_resource("user", user_id)
end
private def subscribe_to_resource(resource : String, id : Int64)
case resource
when "user"
subscribe_to_user_updates(id)
when "post"
subscribe_to_post_updates(id)
when "comment"
subscribe_to_comment_updates(id)
end
end
endError Handling
WebSocket Error Handling
class ErrorHandlingChannel < Azu::Channel
ws "/error_handling"
def on_message(message : String)
begin
data = JSON.parse(message)
handle_message(data)
rescue JSON::ParseException
send_error("Invalid JSON format")
rescue e
Log.error(exception: e) { "Error handling message" }
send_error("Internal server error")
end
end
private def handle_message(data : JSON::Any)
# Message handling logic
end
private def send_error(message : String)
send_to_client({
type: "error",
message: message,
timestamp: Time.utc.to_rfc3339
})
end
endConnection Error Handling
class RobustChannel < Azu::Channel
ws "/robust"
def on_connect
begin
# Connection setup
setup_connection
rescue e
Log.error(exception: e) { "Error setting up connection" }
close_connection(1011, "Connection setup failed")
end
end
def on_close(code, message)
begin
# Cleanup
cleanup_connection
rescue e
Log.error(exception: e) { "Error during cleanup" }
end
end
private def setup_connection
# Connection setup logic
end
private def cleanup_connection
# Cleanup logic
end
endTesting WebSockets
Test your WebSocket channels:
require "spec"
describe ChatChannel do
it "handles connection" do
channel = ChatChannel.new
context = create_websocket_context
channel.on_connect
# Assert connection handling
channel.connections.size.should eq(1)
end
it "handles messages" do
channel = ChatChannel.new
context = create_websocket_context
channel.on_connect
channel.on_message('{"type": "ping"}')
# Assert message handling
# Check for pong response
end
it "handles disconnection" do
channel = ChatChannel.new
context = create_websocket_context
channel.on_connect
channel.on_close(1000, "Normal closure")
# Assert disconnection handling
channel.connections.size.should eq(0)
end
endPerformance Considerations
Connection Pooling
class PooledChannel < Azu::Channel
ws "/pooled"
def initialize
@connection_pool = ConnectionPool.new(max_size: 100)
end
def on_connect
connection = @connection_pool.acquire
# Use connection
end
def on_close(code, message)
@connection_pool.release(connection)
end
endMessage Batching
class BatchedChannel < Azu::Channel
ws "/batched"
def initialize
@message_queue = [] of String
@batch_size = 10
@batch_timeout = 100.milliseconds
end
def on_message(message : String)
@message_queue << message
if @message_queue.size >= @batch_size
process_batch
else
schedule_batch_processing
end
end
private def process_batch
messages = @message_queue.dup
@message_queue.clear
# Process batch of messages
process_messages(messages)
end
endSecurity Considerations
Authentication
class SecureChannel < Azu::Channel
ws "/secure"
def on_connect
unless authenticated?
close_connection(1008, "Authentication required")
return
end
# Proceed with authenticated connection
end
private def authenticated? : Bool
# Validate authentication token
token = get_auth_token
validate_token(token)
end
endRate Limiting
class RateLimitedChannel < Azu::Channel
ws "/rate_limited"
def initialize
@message_counts = {} of String => Int32
@rate_limit = 100 # messages per minute
end
def on_message(message : String)
client_id = get_client_id
# Check rate limit
if rate_limited?(client_id)
send_error("Rate limit exceeded")
return
end
# Process message
handle_message(message)
end
private def rate_limited?(client_id : String) : Bool
count = @message_counts[client_id] || 0
count >= @rate_limit
end
endBest Practices
1. Handle Connection Lifecycle
class LifecycleChannel < Azu::Channel
ws "/lifecycle"
def on_connect
# Setup connection
setup_connection
end
def on_close(code, message)
# Cleanup connection
cleanup_connection
end
private def setup_connection
# Connection setup logic
end
private def cleanup_connection
# Cleanup logic
end
end2. Validate Messages
class ValidatedChannel < Azu::Channel
ws "/validated"
def on_message(message : String)
begin
data = JSON.parse(message)
validate_message(data)
handle_message(data)
rescue JSON::ParseException
send_error("Invalid JSON format")
rescue ValidationError
send_error("Invalid message format")
end
end
private def validate_message(data : JSON::Any)
# Validate message structure
raise ValidationError.new("Missing required fields") unless data["type"]?
end
end3. Use Type Safety
class TypeSafeChannel < Azu::Channel
ws "/type_safe"
def on_message(message : String)
data = JSON.parse(message)
case data["type"]?.try(&.as_s)
when "ping"
handle_ping(data)
when "message"
handle_message(data)
else
send_error("Unknown message type")
end
end
private def handle_ping(data : JSON::Any)
send_to_client({type: "pong", timestamp: Time.utc.to_rfc3339})
end
private def handle_message(data : JSON::Any)
content = data["content"]?.try(&.as_s) || ""
user = data["user"]?.try(&.as_s) || "anonymous"
broadcast_message(content, user)
end
endNext Steps
Now that you understand WebSockets:
Components - Build interactive UI components
Templates - Create real-time templates
Caching - Implement WebSocket caching
Testing - Test your WebSocket channels
Performance - Optimize WebSocket performance
WebSockets in Azu provide a powerful foundation for building real-time applications. With type safety, connection management, and efficient broadcasting, they enable interactive, responsive user experiences.
Last updated
Was this helpful?
