Security Guide
Essential security practices for database interactions, authentication, and data protection in CQL applications.
Security is paramount in production applications. This guide covers essential security practices for CQL applications, from SQL injection prevention to data encryption and access control.
Table of Contents
SQL Injection Prevention
Parameterized Queries (Built-in Protection)
CQL automatically protects against SQL injection through parameterized queries:
# Safe - CQL automatically parameterizes
user = User.where(email: user_input).first?
users = User.where("created_at > ?", date_input).all
# Safe - Query builder methods use parameters
User.where(id: [1, 2, 3]).all
# Dangerous - Raw SQL with string interpolation
schema.exec("SELECT * FROM users WHERE email = '#{user_input}'")  # DON'T DO THIS
# Safe - Raw SQL with parameters
schema.exec_query("SELECT * FROM users WHERE email = ?", [user_input])Safe Dynamic Queries
# Safe dynamic filtering
class UserFilter
  def self.build_query(filters : Hash(String, String))
    query = User.all
    if email = filters["email"]?
      query = query.where(email: email)  # Parameterized automatically
    end
    if status = filters["status"]?
      # Whitelist allowed values
      allowed_statuses = ["active", "inactive", "pending"]
      if allowed_statuses.includes?(status)
        query = query.where(status: status)
      end
    end
    query
  end
end
# Safe column ordering
class UserQuery
  ALLOWED_ORDER_COLUMNS = ["id", "name", "created_at", "email"]
  def self.ordered_by(column : String, direction : String = "asc")
    # Validate column name (prevent injection)
    unless ALLOWED_ORDER_COLUMNS.includes?(column)
      raise ArgumentError.new("Invalid order column: #{column}")
    end
    # Validate direction
    direction = direction.downcase
    unless ["asc", "desc"].includes?(direction)
      direction = "asc"
    end
    # Use string interpolation for column names (not user data)
    User.order("#{column} #{direction}")
  end
endAuthentication & Authorization
Secure Password Handling
# User model with secure password
require "crypto/bcrypt/password"
struct User
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: UserDB, table: :users
  property id : Int64?
  property email : String
  property password_hash : String
  property role : String = "user"
  property active : Bool = true
  property failed_login_attempts : Int32 = 0
  property locked_at : Time?
  property last_login_at : Time?
  # Virtual password attribute
  property password : String = ""
  # CQL validations
  validate :email, presence: true, match: /\A[\w+\-.]+@[a-z\d\-.]+\.[a-z]+\z/i
  validate :password, size: 12..128, message: "Password must be between 12 and 128 characters"
  before_save :hash_password
  def initialize(@email : String, @password : String = "", @role : String = "user")
  end
  def authenticate(password : String) : Bool
    return false if account_locked?
    if Crypto::Bcrypt::Password.new(password_hash).verify(password)
      reset_failed_attempts
      update_last_login
      true
    else
      increment_failed_attempts
      false
    end
  end
  def account_locked? : Bool
    locked_at && locked_at.not_nil! > 30.minutes.ago
  end
  private def hash_password
    return if password.empty?
    # Use high cost factor for production
    cost = ENV["CRYSTAL_ENV"]? == "production" ? 12 : 4
    self.password_hash = Crypto::Bcrypt::Password.create(password, cost: cost).to_s
    self.password = ""  # Clear plaintext password
  end
  private def increment_failed_attempts
    self.failed_login_attempts += 1
    # Lock account after 5 failed attempts
    if failed_login_attempts >= 5
      self.locked_at = Time.utc
    end
    save!
  end
  private def reset_failed_attempts
    if failed_login_attempts > 0
      update!(failed_login_attempts: 0, locked_at: nil)
    end
  end
  private def update_last_login
    update!(last_login_at: Time.utc)
  end
endRole-Based Access Control
# Permission system
enum Permission
  ReadUsers
  WriteUsers
  DeleteUsers
  ReadReports
  AdminAccess
end
# Role definitions
module Roles
  PERMISSIONS = {
    "user" => [Permission::ReadUsers],
    "moderator" => [Permission::ReadUsers, Permission::WriteUsers],
    "admin" => [Permission::ReadUsers, Permission::WriteUsers, Permission::DeleteUsers, Permission::ReadReports],
    "super_admin" => Permission.values  # All permissions
  }
  def self.has_permission?(role : String, permission : Permission) : Bool
    PERMISSIONS[role]?.try(&.includes?(permission)) || false
  end
end
# Authorization mixin
module Authorizable
  def can?(permission : Permission) : Bool
    Roles.has_permission?(role, permission)
  end
  def authorize!(permission : Permission)
    unless can?(permission)
      raise AuthorizationError.new("Insufficient permissions: #{permission}")
    end
  end
end
# Include in User model
struct User
  include Authorizable
  def admin? : Bool
    role == "admin" || role == "super_admin"
  end
  def can_manage?(other_user : User) : Bool
    return false if self == other_user
    admin? && can?(Permission::WriteUsers)
  end
end
# Usage in controllers/services
class UserService
  def update_user(current_user : User, user_id : Int64, attributes : Hash)
    current_user.authorize!(Permission::WriteUsers)
    user = User.find!(user_id)
    # Additional checks
    unless current_user.can_manage?(user)
      raise AuthorizationError.new("Cannot modify this user")
    end
    user.update!(attributes)
  end
endSession Management
# Secure session model
struct UserSession
  include CQL::ActiveRecord::Model(String)  # UUID primary key
  db_context schema: UserDB, table: :user_sessions
  property id : String = UUID.random.to_s
  property user_id : Int64
  property ip_address : String
  property user_agent : String
  property expires_at : Time
  property last_activity : Time = Time.utc
  belongs_to :user, User, foreign_key: :user_id
  # Security configurations
  SESSION_LIFETIME = 24.hours
  ACTIVITY_TIMEOUT = 2.hours
  def initialize(@user_id : Int64, @ip_address : String, @user_agent : String)
    @expires_at = SESSION_LIFETIME.from_now
  end
  def self.create_for_user(user : User, ip : String, user_agent : String)
    # Clean up old sessions
    cleanup_expired_sessions(user)
    new(user.id!, ip, user_agent).tap(&.save!)
  end
  def valid? : Bool
    !expired? && !inactive?
  end
  def expired? : Bool
    expires_at < Time.utc
  end
  def inactive? : Bool
    last_activity < ACTIVITY_TIMEOUT.ago
  end
  def touch_activity
    update!(last_activity: Time.utc)
  end
  def invalidate!
    delete!
  end
  private def self.cleanup_expired_sessions(user : User)
    UserSession.where(user_id: user.id!)
               .where("expires_at < ? OR last_activity < ?", Time.utc, ACTIVITY_TIMEOUT.ago)
               .delete_all
  end
endData Protection
Sensitive Data Encryption
# Encrypted attribute handling
require "crypto/subtle"
module EncryptedAttributes
  macro encrypted_attribute(name, type = String)
    property encrypted_{{name.id}} : String = ""
    def {{name.id}}=(value : {{type}})
      self.encrypted_{{name.id}} = Encryption.encrypt(value.to_s)
    end
    def {{name.id}} : {{type}}?
      return nil if encrypted_{{name.id}}.empty?
      decrypted = Encryption.decrypt(encrypted_{{name.id}})
      {{type}}.new(decrypted)
    rescue
      nil
    end
  end
end
# Encryption service
module Encryption
  extend self
  # Use environment variable for encryption key
  ENCRYPTION_KEY = ENV["ENCRYPTION_KEY"]? || raise("ENCRYPTION_KEY required")
  def encrypt(data : String) : String
    # Implementation would use AES-256-GCM or similar
    # This is a simplified example
    Base64.encode(data)  # Replace with actual encryption
  end
  def decrypt(encrypted_data : String) : String
    # Implementation would decrypt using the same algorithm
    Base64.decode_string(encrypted_data)  # Replace with actual decryption
  end
end
# Usage in models
struct User
  include EncryptedAttributes
  property email : String
  encrypted_attribute social_security_number
  encrypted_attribute credit_card_number
  # These fields are automatically encrypted/decrypted
endPersonal Data Handling (GDPR/Privacy)
# Personal data tracking and management
module PersonalDataCompliance
  extend self
  # Define what constitutes personal data
  PERSONAL_DATA_FIELDS = {
    User => [:email, :name, :phone, :address],
    UserProfile => [:date_of_birth, :biography],
    Order => [:shipping_address, :billing_address]
  }
  def export_user_data(user : User) : Hash(String, Array(Hash(String, String)))
    data = {} of String => Array(Hash(String, String))
    PERSONAL_DATA_FIELDS.each do |model_class, fields|
      records = get_user_records(model_class, user)
      data[model_class.to_s] = records.map do |record|
        extract_personal_fields(record, fields)
      end
    end
    data
  end
  def anonymize_user_data(user : User)
    User.transaction do
      # Replace personal data with anonymized versions
      user.update!(
        email: "deleted_user_#{user.id}@example.com",
        name: "Deleted User",
        phone: nil,
        address: nil
      )
      # Anonymize related records
      user.profiles.each(&.anonymize!) if user.responds_to?(:profiles)
      user.orders.each(&.anonymize_addresses!) if user.responds_to?(:orders)
    end
  end
  def delete_user_data(user : User)
    User.transaction do
      # Delete all user data in correct order (foreign keys)
      if user.responds_to?(:sessions)
        user.sessions.delete_all
      end
      if user.responds_to?(:orders)
        user.orders.delete_all
      end
      if user.responds_to?(:profiles)
        user.profiles.delete_all
      end
      user.delete!
    end
  end
endInput Validation
Comprehensive Validation
# Secure validation patterns
struct User
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: UserDB, table: :users
  property id : Int64?
  property name : String
  property email : String
  property age : Int32 = 0
  # Email validation with security considerations
  validate :email, presence: true, size: 1..254  # RFC 5321 limit
  validate :email, match: /\A[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\z/
  # Password security requirements
  validate :password, size: 12..128, message: "Password must be between 12 and 128 characters"
  # Prevent malicious content
  validate :name, size: 1..100
  validate :name, exclude: ["<script", "javascript:", "data:", "vbscript:"], message: "Contains prohibited content"
  # Custom security validations
  validate :no_sql_injection_patterns
  validate :rate_limit_creation
  def initialize(@name : String, @email : String, @age : Int32 = 0)
  end
  private def no_sql_injection_patterns
    suspicious_patterns = [
      /union\s+select/i,
      /insert\s+into/i,
      /delete\s+from/i,
      /drop\s+table/i,
      /--/,
      /\/\*/
    ]
    [email, name].each do |field|
      next if field.nil?
      suspicious_patterns.each do |pattern|
        if field.matches?(pattern)
          errors << CQL::ActiveRecord::Validations::Error.new(:base, "Suspicious content detected")
          break
        end
      end
    end
  end
  private def rate_limit_creation
    if id.nil? # new record
      recent_count = User.where("created_at > ?", 1.hour.ago)
                        .where(email: email)
                        .count
      if recent_count > 0
        errors << CQL::ActiveRecord::Validations::Error.new(:email, "Too many accounts created recently")
      end
    end
  end
endXSS Prevention
# HTML sanitization for user content
require "html"
module ContentSecurity
  extend self
  # Whitelist of allowed HTML tags
  ALLOWED_TAGS = %w[p br strong em ul ol li blockquote]
  ALLOWED_ATTRIBUTES = %w[class]
  def sanitize_html(content : String) : String
    # Strip all HTML except allowed tags
    # In a real implementation, use a proper HTML sanitizer
    HTML.escape(content)
  end
  def sanitize_for_display(content : String) : String
    # Remove potential XSS vectors
    content.gsub(/javascript:/i, "")
           .gsub(/data:/i, "")
           .gsub(/vbscript:/i, "")
           .gsub(/<script/i, "<script")
  end
end
# Usage in models
struct Post
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: BlogDB, table: :posts
  property id : Int64?
  property title : String
  property content : String
  property user_id : Int64
  before_save :sanitize_content
  def initialize(@title : String, @content : String, @user_id : Int64)
  end
  private def sanitize_content
    self.title = ContentSecurity.sanitize_for_display(title)
    self.content = ContentSecurity.sanitize_html(content)
  end
endDatabase Security
Connection Security
# Secure database configuration
module DatabaseSecurity
  def self.production_config
    {
      # Use SSL/TLS for connections
      uri: "#{ENV["DATABASE_URL"]}?sslmode=require&sslcert=client-cert.pem&sslkey=client-key.pem",
      adapter: CQL::Adapter::Postgres
    }
  end
  def self.validate_connection_security(schema : CQL::Schema)
    # Check SSL is enabled (PostgreSQL example)
    if schema.adapter == CQL::Adapter::Postgres
      result = schema.exec_query("SHOW ssl", as: String)
      unless result == "on"
        raise SecurityError.new("SSL not enabled on database connection")
      end
      # Verify connection encryption
      ssl_info = schema.exec_query("SELECT ssl_cipher FROM pg_stat_ssl WHERE pid = pg_backend_pid()", as: String?)
      if ssl_info.nil? || ssl_info.empty?
        raise SecurityError.new("Database connection is not encrypted")
      end
    end
  end
endDatabase User Permissions
-- Database user setup with minimal permissions
-- Create application user with limited privileges
CREATE USER app_user WITH PASSWORD 'secure_random_password';
-- Grant only necessary permissions
GRANT CONNECT ON DATABASE myapp_production TO app_user;
GRANT USAGE ON SCHEMA public TO app_user;
-- Table-specific permissions
GRANT SELECT, INSERT, UPDATE, DELETE ON users TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON posts TO app_user;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_user;
-- Deny dangerous operations
REVOKE CREATE ON SCHEMA public FROM app_user;
REVOKE ALL ON pg_catalog FROM app_user;
REVOKE ALL ON information_schema FROM app_user;Auditing & Monitoring
Audit Logging
# Audit trail for sensitive operations
struct AuditLog
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: AuditDB, table: :audit_logs
  property id : Int64?
  property user_id : Int64?
  property action : String
  property resource_type : String
  property resource_id : String
  property old_values : String = "{}"
  property new_values : String = "{}"
  property ip_address : String
  property user_agent : String
  property created_at : Time?
  def initialize(@action : String, @resource_type : String, @resource_id : String, @ip_address : String, @user_agent : String, @user_id : Int64? = nil)
  end
  def self.log_action(user : User?, action : String, resource, ip : String, user_agent : String, old_values = nil, new_values = nil)
    audit_log = new(
      action: action,
      resource_type: resource.class.to_s,
      resource_id: resource.id.to_s,
      ip_address: ip,
      user_agent: user_agent,
      user_id: user.try(&.id)
    )
    audit_log.old_values = old_values.try(&.to_json) || "{}"
    audit_log.new_values = new_values.try(&.to_json) || "{}"
    audit_log.save!
  end
end
# Auditable mixin for models
module Auditable
  macro auditable
    after_create :log_creation
    after_update :log_update
    after_delete :log_deletion
    def log_creation
      AuditLog.log_action(
        Current.user,
        "create",
        self,
        Current.ip_address,
        Current.user_agent,
        nil,
        attributes
      )
    end
    def log_update
      if changed?
        AuditLog.log_action(
          Current.user,
          "update",
          self,
          Current.ip_address,
          Current.user_agent,
          changes_before,
          changes_after
        )
      end
    end
    def log_deletion
      AuditLog.log_action(
        Current.user,
        "delete",
        self,
        Current.ip_address,
        Current.user_agent,
        attributes,
        nil
      )
    end
  end
end
# Usage
struct User
  include Auditable
  auditable
endSecurity Monitoring
# Security event monitoring
module SecurityMonitor
  extend self
  def log_suspicious_activity(event_type : String, details : Hash(String, String), user : User? = nil)
    SecurityEvent.new(
      event_type: event_type,
      user_id: user.try(&.id),
      details: details.to_json,
      ip_address: Current.ip_address,
      severity: calculate_severity(event_type)
    ).save!
    # Alert if high severity
    if high_severity_event?(event_type)
      SecurityAlerter.notify(event_type, details)
    end
  end
  def track_failed_login(email : String, ip : String)
    log_suspicious_activity("failed_login", {
      "email" => email,
      "ip" => ip,
      "user_agent" => Current.user_agent
    })
  end
  def track_privilege_escalation(user : User, attempted_action : String)
    log_suspicious_activity("privilege_escalation", {
      "user_id" => user.id.to_s,
      "attempted_action" => attempted_action,
      "current_role" => user.role
    }, user)
  end
  def track_data_export(user : User, record_count : Int32)
    log_suspicious_activity("data_export", {
      "user_id" => user.id.to_s,
      "record_count" => record_count.to_s,
      "export_type" => "user_data"
    }, user)
  end
  private def calculate_severity(event_type : String) : String
    case event_type
    when "failed_login"
      "low"
    when "privilege_escalation"
      "high"
    when "data_export"
      "medium"
    else
      "low"
    end
  end
  private def high_severity_event?(event_type : String) : Bool
    ["privilege_escalation", "suspicious_query", "data_breach"].includes?(event_type)
  end
endSecurity Checklist
Application Security
- Authentication & Authorization - Strong password requirements (12+ characters, complexity) 
- Secure password hashing (bcrypt with high cost) 
- Account lockout after failed attempts 
- Role-based access control implemented 
- Session management with timeouts 
 
- Data Protection - Encrypt sensitive data at rest 
- Use HTTPS/TLS for all connections 
- Implement data anonymization/deletion 
- Handle personal data compliance (GDPR) 
 
Database Security
- Connection Security - SSL/TLS enabled for database connections 
- Dedicated database user with minimal permissions 
- Connection pooling properly configured 
- Query timeouts implemented 
 
- Access Control - Database users have minimal required permissions 
- No shared database accounts 
- Regular credential rotation 
- Network access restrictions 
 
Monitoring & Auditing
- Audit Logging - All sensitive operations logged 
- Audit logs tamper-proof 
- Regular audit log review 
- Long-term audit log retention 
 
- Security Monitoring - Failed login attempt tracking 
- Privilege escalation detection 
- Suspicious query pattern detection 
- Real-time security alerts 
 
Development Security
- Code Security - Security code reviews 
- Dependency vulnerability scanning 
- Secrets management (no hardcoded credentials) 
- Regular security testing 
 
- Environment Security - Separate environments (dev/staging/prod) 
- Production data not used in development 
- Environment variable security 
- Regular security updates 
 
Advanced Security Patterns
Zero-Trust Data Access
# Implement zero-trust principle for data access
module ZeroTrustAccess
  def self.verify_access(user : User, resource, action : String) : Bool
    # Always verify permissions, even for "trusted" users
    return false unless user.active?
    return false if user.account_locked?
    # Check specific permission for action
    permission = map_action_to_permission(action)
    return false unless user.can?(permission)
    # Additional context-based checks
    case resource
    when User
      # Users can only access their own data unless admin
      resource.id == user.id || user.admin?
    when Post
      # Users can access their posts or public posts
      resource.user_id == user.id || resource.public? || user.can?(Permission::ReadAllPosts)
    else
      user.admin?  # Conservative default
    end
  end
  private def self.map_action_to_permission(action : String) : Permission
    case action
    when "read"
      Permission::ReadUsers
    when "write", "update"
      Permission::WriteUsers
    when "delete"
      Permission::DeleteUsers
    else
      Permission::AdminAccess
    end
  end
endThreat Detection
# Advanced threat detection patterns
module ThreatDetection
  extend self
  def analyze_request_pattern(user : User, action : String, ip : String)
    # Check for rate limiting violations
    recent_actions = AuditLog.where(user_id: user.id)
                           .where("created_at > ?", 1.minute.ago)
                           .count
    if recent_actions > 100  # Adjust threshold as needed
      SecurityMonitor.log_suspicious_activity("rate_limit_exceeded", {
        "user_id" => user.id.to_s,
        "action_count" => recent_actions.to_s,
        "timeframe" => "1_minute"
      }, user)
      return false
    end
    # Check for unusual access patterns
    if unusual_access_pattern?(user, action, ip)
      SecurityMonitor.log_suspicious_activity("unusual_access_pattern", {
        "user_id" => user.id.to_s,
        "action" => action,
        "ip" => ip
      }, user)
    end
    true
  end
  private def unusual_access_pattern?(user : User, action : String, ip : String) : Bool
    # Check if IP is from different geographical location
    # Check if action is unusual for this user
    # Check if access time is unusual
    false  # Implement based on your threat model
  end
endSecurity is not a feature, it's a foundation. Implement security measures from the beginning of your project, not as an afterthought. Regular security reviews and updates are essential for maintaining protection.
Next Steps:
- Performance Guide → - Secure performance patterns 
- Testing Guide → - Test your security measures 
- Best Practices → - Secure development practices 
Last updated
Was this helpful?
