Security Guide

Secure your Crystal applications - Essential security practices for database interactions, authentication, and data protection

Security is paramount in production applications. This guide covers essential security practices for CQL applications, from SQL injection prevention to data encryption and access control.

📋 Table of Contents


🛡️ SQL Injection Prevention

🎯 Parameterized Queries (Built-in Protection)

CQL automatically protects against SQL injection through parameterized queries:

# ✅ Safe - CQL automatically parameterizes
user = User.where(email: user_input).first?
users = User.where("created_at > ?", date_input).all

# ✅ Safe - Query builder methods use parameters
User.where(id: [1, 2, 3]).all

# ⚠️ Dangerous - Raw SQL with string interpolation
schema.exec("SELECT * FROM users WHERE email = '#{user_input}'")  # DON'T DO THIS

# ✅ Safe - Raw SQL with parameters
schema.exec_query("SELECT * FROM users WHERE email = ?", [user_input])

🔧 Safe Dynamic Queries

# ✅ Safe dynamic filtering
class UserFilter
  def self.build_query(filters : Hash(String, String))
    query = User.all

    if email = filters["email"]?
      query = query.where(email: email)  # Parameterized automatically
    end

    if status = filters["status"]?
      # Whitelist allowed values
      allowed_statuses = ["active", "inactive", "pending"]
      if allowed_statuses.includes?(status)
        query = query.where(status: status)
      end
    end

    query
  end
end

# ✅ Safe column ordering
class UserQuery
  ALLOWED_ORDER_COLUMNS = ["id", "name", "created_at", "email"]

  def self.ordered_by(column : String, direction : String = "asc")
    # Validate column name (prevent injection)
    unless ALLOWED_ORDER_COLUMNS.includes?(column)
      raise ArgumentError.new("Invalid order column: #{column}")
    end

    # Validate direction
    direction = direction.downcase
    unless ["asc", "desc"].includes?(direction)
      direction = "asc"
    end

    # Use string interpolation for column names (not user data)
    User.order("#{column} #{direction}")
  end
end

🔐 Authentication & Authorization

🔑 Secure Password Handling

# User model with secure password
require "crypto/bcrypt/password"

struct User
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: UserDB, table: :users

  property id : Int64?
  property email : String
  property password_hash : String
  property role : String = "user"
  property active : Bool = true
  property failed_login_attempts : Int32 = 0
  property locked_at : Time?
  property last_login_at : Time?

  # Virtual password attribute
  property password : String = ""

  # CQL validations
  validate :email, presence: true, match: /\A[\w+\-.]+@[a-z\d\-.]+\.[a-z]+\z/i
  validate :password, size: 12..128, message: "Password must be between 12 and 128 characters"

  before_save :hash_password

  def initialize(@email : String, @password : String = "", @role : String = "user")
  end

  def authenticate(password : String) : Bool
    return false if account_locked?

    if Crypto::Bcrypt::Password.new(password_hash).verify(password)
      reset_failed_attempts
      update_last_login
      true
    else
      increment_failed_attempts
      false
    end
  end

  def account_locked? : Bool
    locked_at && locked_at.not_nil! > 30.minutes.ago
  end

  private def hash_password
    return if password.empty?

    # Use high cost factor for production
    cost = ENV["CRYSTAL_ENV"]? == "production" ? 12 : 4
    self.password_hash = Crypto::Bcrypt::Password.create(password, cost: cost).to_s
    self.password = ""  # Clear plaintext password
  end

  private def increment_failed_attempts
    self.failed_login_attempts += 1

    # Lock account after 5 failed attempts
    if failed_login_attempts >= 5
      self.locked_at = Time.utc
    end

    save!
  end

  private def reset_failed_attempts
    if failed_login_attempts > 0
      update!(failed_login_attempts: 0, locked_at: nil)
    end
  end

  private def update_last_login
    update!(last_login_at: Time.utc)
  end
end

🛡️ Role-Based Access Control

# Permission system
enum Permission
  ReadUsers
  WriteUsers
  DeleteUsers
  ReadReports
  AdminAccess
end

# Role definitions
module Roles
  PERMISSIONS = {
    "user" => [Permission::ReadUsers],
    "moderator" => [Permission::ReadUsers, Permission::WriteUsers],
    "admin" => [Permission::ReadUsers, Permission::WriteUsers, Permission::DeleteUsers, Permission::ReadReports],
    "super_admin" => Permission.values  # All permissions
  }

  def self.has_permission?(role : String, permission : Permission) : Bool
    PERMISSIONS[role]?.try(&.includes?(permission)) || false
  end
end

# Authorization mixin
module Authorizable
  def can?(permission : Permission) : Bool
    Roles.has_permission?(role, permission)
  end

  def authorize!(permission : Permission)
    unless can?(permission)
      raise AuthorizationError.new("Insufficient permissions: #{permission}")
    end
  end
end

# Include in User model
struct User
  include Authorizable

  def admin? : Bool
    role == "admin" || role == "super_admin"
  end

  def can_manage?(other_user : User) : Bool
    return false if self == other_user
    admin? && can?(Permission::WriteUsers)
  end
end

# Usage in controllers/services
class UserService
  def update_user(current_user : User, user_id : Int64, attributes : Hash)
    current_user.authorize!(Permission::WriteUsers)

    user = User.find!(user_id)

    # Additional checks
    unless current_user.can_manage?(user)
      raise AuthorizationError.new("Cannot modify this user")
    end

    user.update!(attributes)
  end
end

🔐 Session Management

# Secure session model
struct UserSession
  include CQL::ActiveRecord::Model(String)  # UUID primary key
  db_context schema: UserDB, table: :user_sessions

  property id : String = UUID.random.to_s
  property user_id : Int64
  property ip_address : String
  property user_agent : String
  property expires_at : Time
  property last_activity : Time = Time.utc

  belongs_to :user, User, foreign_key: :user_id

  # Security configurations
  SESSION_LIFETIME = 24.hours
  ACTIVITY_TIMEOUT = 2.hours

  def initialize(@user_id : Int64, @ip_address : String, @user_agent : String)
    @expires_at = SESSION_LIFETIME.from_now
  end

  def self.create_for_user(user : User, ip : String, user_agent : String)
    # Clean up old sessions
    cleanup_expired_sessions(user)

    new(user.id!, ip, user_agent).tap(&.save!)
  end

  def valid? : Bool
    !expired? && !inactive?
  end

  def expired? : Bool
    expires_at < Time.utc
  end

  def inactive? : Bool
    last_activity < ACTIVITY_TIMEOUT.ago
  end

  def touch_activity
    update!(last_activity: Time.utc)
  end

  def invalidate!
    delete!
  end

  private def self.cleanup_expired_sessions(user : User)
    UserSession.where(user_id: user.id!)
               .where("expires_at < ? OR last_activity < ?", Time.utc, ACTIVITY_TIMEOUT.ago)
               .delete_all
  end
end

🔒 Data Protection

🔐 Sensitive Data Encryption

# Encrypted attribute handling
require "crypto/subtle"

module EncryptedAttributes
  macro encrypted_attribute(name, type = String)
    property encrypted_{{name.id}} : String = ""

    def {{name.id}}=(value : {{type}})
      self.encrypted_{{name.id}} = Encryption.encrypt(value.to_s)
    end

    def {{name.id}} : {{type}}?
      return nil if encrypted_{{name.id}}.empty?
      decrypted = Encryption.decrypt(encrypted_{{name.id}})
      {{type}}.new(decrypted)
    rescue
      nil
    end
  end
end

# Encryption service
module Encryption
  extend self

  # Use environment variable for encryption key
  ENCRYPTION_KEY = ENV["ENCRYPTION_KEY"]? || raise("ENCRYPTION_KEY required")

  def encrypt(data : String) : String
    # Implementation would use AES-256-GCM or similar
    # This is a simplified example
    Base64.encode(data)  # Replace with actual encryption
  end

  def decrypt(encrypted_data : String) : String
    # Implementation would decrypt using the same algorithm
    Base64.decode_string(encrypted_data)  # Replace with actual decryption
  end
end

# Usage in models
struct User
  include EncryptedAttributes

  property email : String
  encrypted_attribute social_security_number
  encrypted_attribute credit_card_number

  # These fields are automatically encrypted/decrypted
end

🔍 Personal Data Handling (GDPR/Privacy)

# Personal data tracking and management
module PersonalDataCompliance
  extend self

  # Define what constitutes personal data
  PERSONAL_DATA_FIELDS = {
    User => [:email, :name, :phone, :address],
    UserProfile => [:date_of_birth, :biography],
    Order => [:shipping_address, :billing_address]
  }

  def export_user_data(user : User) : Hash(String, Array(Hash(String, String)))
    data = {} of String => Array(Hash(String, String))

    PERSONAL_DATA_FIELDS.each do |model_class, fields|
      records = get_user_records(model_class, user)
      data[model_class.to_s] = records.map do |record|
        extract_personal_fields(record, fields)
      end
    end

    data
  end

  def anonymize_user_data(user : User)
    User.transaction do
      # Replace personal data with anonymized versions
      user.update!(
        email: "deleted_user_#{user.id}@example.com",
        name: "Deleted User",
        phone: nil,
        address: nil
      )

      # Anonymize related records
      user.profiles.each(&.anonymize!) if user.responds_to?(:profiles)
      user.orders.each(&.anonymize_addresses!) if user.responds_to?(:orders)
    end
  end

  def delete_user_data(user : User)
    User.transaction do
      # Delete all user data in correct order (foreign keys)
      if user.responds_to?(:sessions)
        user.sessions.delete_all
      end
      if user.responds_to?(:orders)
        user.orders.delete_all
      end
      if user.responds_to?(:profiles)
        user.profiles.delete_all
      end
      user.delete!
    end
  end
end

🚫 Input Validation

✅ Comprehensive Validation

# Secure validation patterns
struct User
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: UserDB, table: :users

  property id : Int64?
  property name : String
  property email : String
  property age : Int32 = 0

  # Email validation with security considerations
  validate :email, presence: true, size: 1..254  # RFC 5321 limit
  validate :email, match: /\A[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\z/

  # Password security requirements
  validate :password, size: 12..128, message: "Password must be between 12 and 128 characters"

  # Prevent malicious content
  validate :name, size: 1..100
  validate :name, exclude: ["<script", "javascript:", "data:", "vbscript:"], message: "Contains prohibited content"

  # Custom security validations
  validate :no_sql_injection_patterns
  validate :rate_limit_creation

  def initialize(@name : String, @email : String, @age : Int32 = 0)
  end

  private def no_sql_injection_patterns
    suspicious_patterns = [
      /union\s+select/i,
      /insert\s+into/i,
      /delete\s+from/i,
      /drop\s+table/i,
      /--/,
      /\/\*/
    ]

    [email, name].each do |field|
      next if field.nil?

      suspicious_patterns.each do |pattern|
        if field.matches?(pattern)
          errors << CQL::ActiveRecord::Validations::Error.new(:base, "Suspicious content detected")
          break
        end
      end
    end
  end

  private def rate_limit_creation
    if id.nil? # new record
      recent_count = User.where("created_at > ?", 1.hour.ago)
                        .where(email: email)
                        .count

      if recent_count > 0
        errors << CQL::ActiveRecord::Validations::Error.new(:email, "Too many accounts created recently")
      end
    end
  end
end

🛡️ XSS Prevention

# HTML sanitization for user content
require "html"

module ContentSecurity
  extend self

  # Whitelist of allowed HTML tags
  ALLOWED_TAGS = %w[p br strong em ul ol li blockquote]
  ALLOWED_ATTRIBUTES = %w[class]

  def sanitize_html(content : String) : String
    # Strip all HTML except allowed tags
    # In a real implementation, use a proper HTML sanitizer
    HTML.escape(content)
  end

  def sanitize_for_display(content : String) : String
    # Remove potential XSS vectors
    content.gsub(/javascript:/i, "")
           .gsub(/data:/i, "")
           .gsub(/vbscript:/i, "")
           .gsub(/<script/i, "&lt;script")
  end
end

# Usage in models
struct Post
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: BlogDB, table: :posts

  property id : Int64?
  property title : String
  property content : String
  property user_id : Int64

  before_save :sanitize_content

  def initialize(@title : String, @content : String, @user_id : Int64)
  end

  private def sanitize_content
    self.title = ContentSecurity.sanitize_for_display(title)
    self.content = ContentSecurity.sanitize_html(content)
  end
end

🔑 Database Security

🔒 Connection Security

# Secure database configuration
module DatabaseSecurity
  def self.production_config
    {
      # Use SSL/TLS for connections
      uri: "#{ENV["DATABASE_URL"]}?sslmode=require&sslcert=client-cert.pem&sslkey=client-key.pem",
      adapter: CQL::Adapter::Postgres
    }
  end

  def self.validate_connection_security(schema : CQL::Schema)
    # Check SSL is enabled (PostgreSQL example)
    if schema.adapter == CQL::Adapter::Postgres
      result = schema.exec_query("SHOW ssl", as: String)
      unless result == "on"
        raise SecurityError.new("SSL not enabled on database connection")
      end

      # Verify connection encryption
      ssl_info = schema.exec_query("SELECT ssl_cipher FROM pg_stat_ssl WHERE pid = pg_backend_pid()", as: String?)
      if ssl_info.nil? || ssl_info.empty?
        raise SecurityError.new("Database connection is not encrypted")
      end
    end
  end
end

🔐 Database User Permissions

-- Database user setup with minimal permissions
-- Create application user with limited privileges
CREATE USER app_user WITH PASSWORD 'secure_random_password';

-- Grant only necessary permissions
GRANT CONNECT ON DATABASE myapp_production TO app_user;
GRANT USAGE ON SCHEMA public TO app_user;

-- Table-specific permissions
GRANT SELECT, INSERT, UPDATE, DELETE ON users TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON posts TO app_user;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_user;

-- Deny dangerous operations
REVOKE CREATE ON SCHEMA public FROM app_user;
REVOKE ALL ON pg_catalog FROM app_user;
REVOKE ALL ON information_schema FROM app_user;

📊 Auditing & Monitoring

📝 Audit Logging

# Audit trail for sensitive operations
struct AuditLog
  include CQL::ActiveRecord::Model(Int64)
  db_context schema: AuditDB, table: :audit_logs

  property id : Int64?
  property user_id : Int64?
  property action : String
  property resource_type : String
  property resource_id : String
  property old_values : String = "{}"
  property new_values : String = "{}"
  property ip_address : String
  property user_agent : String
  property created_at : Time?

  def initialize(@action : String, @resource_type : String, @resource_id : String, @ip_address : String, @user_agent : String, @user_id : Int64? = nil)
  end

  def self.log_action(user : User?, action : String, resource, ip : String, user_agent : String, old_values = nil, new_values = nil)
    audit_log = new(
      action: action,
      resource_type: resource.class.to_s,
      resource_id: resource.id.to_s,
      ip_address: ip,
      user_agent: user_agent,
      user_id: user.try(&.id)
    )

    audit_log.old_values = old_values.try(&.to_json) || "{}"
    audit_log.new_values = new_values.try(&.to_json) || "{}"
    audit_log.save!
  end
end

# Auditable mixin for models
module Auditable
  macro auditable
    after_create :log_creation
    after_update :log_update
    after_delete :log_deletion

    def log_creation
      AuditLog.log_action(
        Current.user,
        "create",
        self,
        Current.ip_address,
        Current.user_agent,
        nil,
        attributes
      )
    end

    def log_update
      if changed?
        AuditLog.log_action(
          Current.user,
          "update",
          self,
          Current.ip_address,
          Current.user_agent,
          changes_before,
          changes_after
        )
      end
    end

    def log_deletion
      AuditLog.log_action(
        Current.user,
        "delete",
        self,
        Current.ip_address,
        Current.user_agent,
        attributes,
        nil
      )
    end
  end
end

# Usage
struct User
  include Auditable
  auditable
end

🚨 Security Monitoring

# Security event monitoring
module SecurityMonitor
  extend self

  def log_suspicious_activity(event_type : String, details : Hash(String, String), user : User? = nil)
    SecurityEvent.new(
      event_type: event_type,
      user_id: user.try(&.id),
      details: details.to_json,
      ip_address: Current.ip_address,
      severity: calculate_severity(event_type)
    ).save!

    # Alert if high severity
    if high_severity_event?(event_type)
      SecurityAlerter.notify(event_type, details)
    end
  end

  def track_failed_login(email : String, ip : String)
    log_suspicious_activity("failed_login", {
      "email" => email,
      "ip" => ip,
      "user_agent" => Current.user_agent
    })
  end

  def track_privilege_escalation(user : User, attempted_action : String)
    log_suspicious_activity("privilege_escalation", {
      "user_id" => user.id.to_s,
      "attempted_action" => attempted_action,
      "current_role" => user.role
    }, user)
  end

  def track_data_export(user : User, record_count : Int32)
    log_suspicious_activity("data_export", {
      "user_id" => user.id.to_s,
      "record_count" => record_count.to_s,
      "export_type" => "user_data"
    }, user)
  end

  private def calculate_severity(event_type : String) : String
    case event_type
    when "failed_login"
      "low"
    when "privilege_escalation"
      "high"
    when "data_export"
      "medium"
    else
      "low"
    end
  end

  private def high_severity_event?(event_type : String) : Bool
    ["privilege_escalation", "suspicious_query", "data_breach"].includes?(event_type)
  end
end

✅ Security Checklist

🔐 Application Security

🗄️ Database Security

📊 Monitoring & Auditing

🔧 Development Security


🚀 Advanced Security Patterns

🔐 Zero-Trust Data Access

# Implement zero-trust principle for data access
module ZeroTrustAccess
  def self.verify_access(user : User, resource, action : String) : Bool
    # Always verify permissions, even for "trusted" users
    return false unless user.active?
    return false if user.account_locked?

    # Check specific permission for action
    permission = map_action_to_permission(action)
    return false unless user.can?(permission)

    # Additional context-based checks
    case resource
    when User
      # Users can only access their own data unless admin
      resource.id == user.id || user.admin?
    when Post
      # Users can access their posts or public posts
      resource.user_id == user.id || resource.public? || user.can?(Permission::ReadAllPosts)
    else
      user.admin?  # Conservative default
    end
  end

  private def self.map_action_to_permission(action : String) : Permission
    case action
    when "read"
      Permission::ReadUsers
    when "write", "update"
      Permission::WriteUsers
    when "delete"
      Permission::DeleteUsers
    else
      Permission::AdminAccess
    end
  end
end

🕵️ Threat Detection

# Advanced threat detection patterns
module ThreatDetection
  extend self

  def analyze_request_pattern(user : User, action : String, ip : String)
    # Check for rate limiting violations
    recent_actions = AuditLog.where(user_id: user.id)
                           .where("created_at > ?", 1.minute.ago)
                           .count

    if recent_actions > 100  # Adjust threshold as needed
      SecurityMonitor.log_suspicious_activity("rate_limit_exceeded", {
        "user_id" => user.id.to_s,
        "action_count" => recent_actions.to_s,
        "timeframe" => "1_minute"
      }, user)
      return false
    end

    # Check for unusual access patterns
    if unusual_access_pattern?(user, action, ip)
      SecurityMonitor.log_suspicious_activity("unusual_access_pattern", {
        "user_id" => user.id.to_s,
        "action" => action,
        "ip" => ip
      }, user)
    end

    true
  end

  private def unusual_access_pattern?(user : User, action : String, ip : String) : Bool
    # Check if IP is from different geographical location
    # Check if action is unusual for this user
    # Check if access time is unusual
    false  # Implement based on your threat model
  end
end

🔐 Security is not a feature, it's a foundation - Implement security measures from the beginning of your project, not as an afterthought. Regular security reviews and updates are essential for maintaining protection.

Next Steps:

Last updated

Was this helpful?