Security Guide
Secure your Crystal applications - Essential security practices for database interactions, authentication, and data protection
Security is paramount in production applications. This guide covers essential security practices for CQL applications, from SQL injection prevention to data encryption and access control.
📋 Table of Contents
🛡️ SQL Injection Prevention
🎯 Parameterized Queries (Built-in Protection)
CQL automatically protects against SQL injection through parameterized queries:
# ✅ Safe - CQL automatically parameterizes
user = User.where(email: user_input).first?
users = User.where("created_at > ?", date_input).all
# ✅ Safe - Query builder methods use parameters
User.where(id: [1, 2, 3]).all
# ⚠️ Dangerous - Raw SQL with string interpolation
schema.exec("SELECT * FROM users WHERE email = '#{user_input}'") # DON'T DO THIS
# ✅ Safe - Raw SQL with parameters
schema.exec_query("SELECT * FROM users WHERE email = ?", [user_input])
🔧 Safe Dynamic Queries
# ✅ Safe dynamic filtering
class UserFilter
def self.build_query(filters : Hash(String, String))
query = User.all
if email = filters["email"]?
query = query.where(email: email) # Parameterized automatically
end
if status = filters["status"]?
# Whitelist allowed values
allowed_statuses = ["active", "inactive", "pending"]
if allowed_statuses.includes?(status)
query = query.where(status: status)
end
end
query
end
end
# ✅ Safe column ordering
class UserQuery
ALLOWED_ORDER_COLUMNS = ["id", "name", "created_at", "email"]
def self.ordered_by(column : String, direction : String = "asc")
# Validate column name (prevent injection)
unless ALLOWED_ORDER_COLUMNS.includes?(column)
raise ArgumentError.new("Invalid order column: #{column}")
end
# Validate direction
direction = direction.downcase
unless ["asc", "desc"].includes?(direction)
direction = "asc"
end
# Use string interpolation for column names (not user data)
User.order("#{column} #{direction}")
end
end
🔐 Authentication & Authorization
🔑 Secure Password Handling
# User model with secure password
require "crypto/bcrypt/password"
struct User
include CQL::ActiveRecord::Model(Int64)
db_context schema: UserDB, table: :users
property id : Int64?
property email : String
property password_hash : String
property role : String = "user"
property active : Bool = true
property failed_login_attempts : Int32 = 0
property locked_at : Time?
property last_login_at : Time?
# Virtual password attribute
property password : String = ""
# CQL validations
validate :email, presence: true, match: /\A[\w+\-.]+@[a-z\d\-.]+\.[a-z]+\z/i
validate :password, size: 12..128, message: "Password must be between 12 and 128 characters"
before_save :hash_password
def initialize(@email : String, @password : String = "", @role : String = "user")
end
def authenticate(password : String) : Bool
return false if account_locked?
if Crypto::Bcrypt::Password.new(password_hash).verify(password)
reset_failed_attempts
update_last_login
true
else
increment_failed_attempts
false
end
end
def account_locked? : Bool
locked_at && locked_at.not_nil! > 30.minutes.ago
end
private def hash_password
return if password.empty?
# Use high cost factor for production
cost = ENV["CRYSTAL_ENV"]? == "production" ? 12 : 4
self.password_hash = Crypto::Bcrypt::Password.create(password, cost: cost).to_s
self.password = "" # Clear plaintext password
end
private def increment_failed_attempts
self.failed_login_attempts += 1
# Lock account after 5 failed attempts
if failed_login_attempts >= 5
self.locked_at = Time.utc
end
save!
end
private def reset_failed_attempts
if failed_login_attempts > 0
update!(failed_login_attempts: 0, locked_at: nil)
end
end
private def update_last_login
update!(last_login_at: Time.utc)
end
end
🛡️ Role-Based Access Control
# Permission system
enum Permission
ReadUsers
WriteUsers
DeleteUsers
ReadReports
AdminAccess
end
# Role definitions
module Roles
PERMISSIONS = {
"user" => [Permission::ReadUsers],
"moderator" => [Permission::ReadUsers, Permission::WriteUsers],
"admin" => [Permission::ReadUsers, Permission::WriteUsers, Permission::DeleteUsers, Permission::ReadReports],
"super_admin" => Permission.values # All permissions
}
def self.has_permission?(role : String, permission : Permission) : Bool
PERMISSIONS[role]?.try(&.includes?(permission)) || false
end
end
# Authorization mixin
module Authorizable
def can?(permission : Permission) : Bool
Roles.has_permission?(role, permission)
end
def authorize!(permission : Permission)
unless can?(permission)
raise AuthorizationError.new("Insufficient permissions: #{permission}")
end
end
end
# Include in User model
struct User
include Authorizable
def admin? : Bool
role == "admin" || role == "super_admin"
end
def can_manage?(other_user : User) : Bool
return false if self == other_user
admin? && can?(Permission::WriteUsers)
end
end
# Usage in controllers/services
class UserService
def update_user(current_user : User, user_id : Int64, attributes : Hash)
current_user.authorize!(Permission::WriteUsers)
user = User.find!(user_id)
# Additional checks
unless current_user.can_manage?(user)
raise AuthorizationError.new("Cannot modify this user")
end
user.update!(attributes)
end
end
🔐 Session Management
# Secure session model
struct UserSession
include CQL::ActiveRecord::Model(String) # UUID primary key
db_context schema: UserDB, table: :user_sessions
property id : String = UUID.random.to_s
property user_id : Int64
property ip_address : String
property user_agent : String
property expires_at : Time
property last_activity : Time = Time.utc
belongs_to :user, User, foreign_key: :user_id
# Security configurations
SESSION_LIFETIME = 24.hours
ACTIVITY_TIMEOUT = 2.hours
def initialize(@user_id : Int64, @ip_address : String, @user_agent : String)
@expires_at = SESSION_LIFETIME.from_now
end
def self.create_for_user(user : User, ip : String, user_agent : String)
# Clean up old sessions
cleanup_expired_sessions(user)
new(user.id!, ip, user_agent).tap(&.save!)
end
def valid? : Bool
!expired? && !inactive?
end
def expired? : Bool
expires_at < Time.utc
end
def inactive? : Bool
last_activity < ACTIVITY_TIMEOUT.ago
end
def touch_activity
update!(last_activity: Time.utc)
end
def invalidate!
delete!
end
private def self.cleanup_expired_sessions(user : User)
UserSession.where(user_id: user.id!)
.where("expires_at < ? OR last_activity < ?", Time.utc, ACTIVITY_TIMEOUT.ago)
.delete_all
end
end
🔒 Data Protection
🔐 Sensitive Data Encryption
# Encrypted attribute handling
require "crypto/subtle"
module EncryptedAttributes
macro encrypted_attribute(name, type = String)
property encrypted_{{name.id}} : String = ""
def {{name.id}}=(value : {{type}})
self.encrypted_{{name.id}} = Encryption.encrypt(value.to_s)
end
def {{name.id}} : {{type}}?
return nil if encrypted_{{name.id}}.empty?
decrypted = Encryption.decrypt(encrypted_{{name.id}})
{{type}}.new(decrypted)
rescue
nil
end
end
end
# Encryption service
module Encryption
extend self
# Use environment variable for encryption key
ENCRYPTION_KEY = ENV["ENCRYPTION_KEY"]? || raise("ENCRYPTION_KEY required")
def encrypt(data : String) : String
# Implementation would use AES-256-GCM or similar
# This is a simplified example
Base64.encode(data) # Replace with actual encryption
end
def decrypt(encrypted_data : String) : String
# Implementation would decrypt using the same algorithm
Base64.decode_string(encrypted_data) # Replace with actual decryption
end
end
# Usage in models
struct User
include EncryptedAttributes
property email : String
encrypted_attribute social_security_number
encrypted_attribute credit_card_number
# These fields are automatically encrypted/decrypted
end
🔍 Personal Data Handling (GDPR/Privacy)
# Personal data tracking and management
module PersonalDataCompliance
extend self
# Define what constitutes personal data
PERSONAL_DATA_FIELDS = {
User => [:email, :name, :phone, :address],
UserProfile => [:date_of_birth, :biography],
Order => [:shipping_address, :billing_address]
}
def export_user_data(user : User) : Hash(String, Array(Hash(String, String)))
data = {} of String => Array(Hash(String, String))
PERSONAL_DATA_FIELDS.each do |model_class, fields|
records = get_user_records(model_class, user)
data[model_class.to_s] = records.map do |record|
extract_personal_fields(record, fields)
end
end
data
end
def anonymize_user_data(user : User)
User.transaction do
# Replace personal data with anonymized versions
user.update!(
email: "deleted_user_#{user.id}@example.com",
name: "Deleted User",
phone: nil,
address: nil
)
# Anonymize related records
user.profiles.each(&.anonymize!) if user.responds_to?(:profiles)
user.orders.each(&.anonymize_addresses!) if user.responds_to?(:orders)
end
end
def delete_user_data(user : User)
User.transaction do
# Delete all user data in correct order (foreign keys)
if user.responds_to?(:sessions)
user.sessions.delete_all
end
if user.responds_to?(:orders)
user.orders.delete_all
end
if user.responds_to?(:profiles)
user.profiles.delete_all
end
user.delete!
end
end
end
🚫 Input Validation
✅ Comprehensive Validation
# Secure validation patterns
struct User
include CQL::ActiveRecord::Model(Int64)
db_context schema: UserDB, table: :users
property id : Int64?
property name : String
property email : String
property age : Int32 = 0
# Email validation with security considerations
validate :email, presence: true, size: 1..254 # RFC 5321 limit
validate :email, match: /\A[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\z/
# Password security requirements
validate :password, size: 12..128, message: "Password must be between 12 and 128 characters"
# Prevent malicious content
validate :name, size: 1..100
validate :name, exclude: ["<script", "javascript:", "data:", "vbscript:"], message: "Contains prohibited content"
# Custom security validations
validate :no_sql_injection_patterns
validate :rate_limit_creation
def initialize(@name : String, @email : String, @age : Int32 = 0)
end
private def no_sql_injection_patterns
suspicious_patterns = [
/union\s+select/i,
/insert\s+into/i,
/delete\s+from/i,
/drop\s+table/i,
/--/,
/\/\*/
]
[email, name].each do |field|
next if field.nil?
suspicious_patterns.each do |pattern|
if field.matches?(pattern)
errors << CQL::ActiveRecord::Validations::Error.new(:base, "Suspicious content detected")
break
end
end
end
end
private def rate_limit_creation
if id.nil? # new record
recent_count = User.where("created_at > ?", 1.hour.ago)
.where(email: email)
.count
if recent_count > 0
errors << CQL::ActiveRecord::Validations::Error.new(:email, "Too many accounts created recently")
end
end
end
end
🛡️ XSS Prevention
# HTML sanitization for user content
require "html"
module ContentSecurity
extend self
# Whitelist of allowed HTML tags
ALLOWED_TAGS = %w[p br strong em ul ol li blockquote]
ALLOWED_ATTRIBUTES = %w[class]
def sanitize_html(content : String) : String
# Strip all HTML except allowed tags
# In a real implementation, use a proper HTML sanitizer
HTML.escape(content)
end
def sanitize_for_display(content : String) : String
# Remove potential XSS vectors
content.gsub(/javascript:/i, "")
.gsub(/data:/i, "")
.gsub(/vbscript:/i, "")
.gsub(/<script/i, "<script")
end
end
# Usage in models
struct Post
include CQL::ActiveRecord::Model(Int64)
db_context schema: BlogDB, table: :posts
property id : Int64?
property title : String
property content : String
property user_id : Int64
before_save :sanitize_content
def initialize(@title : String, @content : String, @user_id : Int64)
end
private def sanitize_content
self.title = ContentSecurity.sanitize_for_display(title)
self.content = ContentSecurity.sanitize_html(content)
end
end
🔑 Database Security
🔒 Connection Security
# Secure database configuration
module DatabaseSecurity
def self.production_config
{
# Use SSL/TLS for connections
uri: "#{ENV["DATABASE_URL"]}?sslmode=require&sslcert=client-cert.pem&sslkey=client-key.pem",
adapter: CQL::Adapter::Postgres
}
end
def self.validate_connection_security(schema : CQL::Schema)
# Check SSL is enabled (PostgreSQL example)
if schema.adapter == CQL::Adapter::Postgres
result = schema.exec_query("SHOW ssl", as: String)
unless result == "on"
raise SecurityError.new("SSL not enabled on database connection")
end
# Verify connection encryption
ssl_info = schema.exec_query("SELECT ssl_cipher FROM pg_stat_ssl WHERE pid = pg_backend_pid()", as: String?)
if ssl_info.nil? || ssl_info.empty?
raise SecurityError.new("Database connection is not encrypted")
end
end
end
end
🔐 Database User Permissions
-- Database user setup with minimal permissions
-- Create application user with limited privileges
CREATE USER app_user WITH PASSWORD 'secure_random_password';
-- Grant only necessary permissions
GRANT CONNECT ON DATABASE myapp_production TO app_user;
GRANT USAGE ON SCHEMA public TO app_user;
-- Table-specific permissions
GRANT SELECT, INSERT, UPDATE, DELETE ON users TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON posts TO app_user;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_user;
-- Deny dangerous operations
REVOKE CREATE ON SCHEMA public FROM app_user;
REVOKE ALL ON pg_catalog FROM app_user;
REVOKE ALL ON information_schema FROM app_user;
📊 Auditing & Monitoring
📝 Audit Logging
# Audit trail for sensitive operations
struct AuditLog
include CQL::ActiveRecord::Model(Int64)
db_context schema: AuditDB, table: :audit_logs
property id : Int64?
property user_id : Int64?
property action : String
property resource_type : String
property resource_id : String
property old_values : String = "{}"
property new_values : String = "{}"
property ip_address : String
property user_agent : String
property created_at : Time?
def initialize(@action : String, @resource_type : String, @resource_id : String, @ip_address : String, @user_agent : String, @user_id : Int64? = nil)
end
def self.log_action(user : User?, action : String, resource, ip : String, user_agent : String, old_values = nil, new_values = nil)
audit_log = new(
action: action,
resource_type: resource.class.to_s,
resource_id: resource.id.to_s,
ip_address: ip,
user_agent: user_agent,
user_id: user.try(&.id)
)
audit_log.old_values = old_values.try(&.to_json) || "{}"
audit_log.new_values = new_values.try(&.to_json) || "{}"
audit_log.save!
end
end
# Auditable mixin for models
module Auditable
macro auditable
after_create :log_creation
after_update :log_update
after_delete :log_deletion
def log_creation
AuditLog.log_action(
Current.user,
"create",
self,
Current.ip_address,
Current.user_agent,
nil,
attributes
)
end
def log_update
if changed?
AuditLog.log_action(
Current.user,
"update",
self,
Current.ip_address,
Current.user_agent,
changes_before,
changes_after
)
end
end
def log_deletion
AuditLog.log_action(
Current.user,
"delete",
self,
Current.ip_address,
Current.user_agent,
attributes,
nil
)
end
end
end
# Usage
struct User
include Auditable
auditable
end
🚨 Security Monitoring
# Security event monitoring
module SecurityMonitor
extend self
def log_suspicious_activity(event_type : String, details : Hash(String, String), user : User? = nil)
SecurityEvent.new(
event_type: event_type,
user_id: user.try(&.id),
details: details.to_json,
ip_address: Current.ip_address,
severity: calculate_severity(event_type)
).save!
# Alert if high severity
if high_severity_event?(event_type)
SecurityAlerter.notify(event_type, details)
end
end
def track_failed_login(email : String, ip : String)
log_suspicious_activity("failed_login", {
"email" => email,
"ip" => ip,
"user_agent" => Current.user_agent
})
end
def track_privilege_escalation(user : User, attempted_action : String)
log_suspicious_activity("privilege_escalation", {
"user_id" => user.id.to_s,
"attempted_action" => attempted_action,
"current_role" => user.role
}, user)
end
def track_data_export(user : User, record_count : Int32)
log_suspicious_activity("data_export", {
"user_id" => user.id.to_s,
"record_count" => record_count.to_s,
"export_type" => "user_data"
}, user)
end
private def calculate_severity(event_type : String) : String
case event_type
when "failed_login"
"low"
when "privilege_escalation"
"high"
when "data_export"
"medium"
else
"low"
end
end
private def high_severity_event?(event_type : String) : Bool
["privilege_escalation", "suspicious_query", "data_breach"].includes?(event_type)
end
end
✅ Security Checklist
🔐 Application Security
Authentication & Authorization
Strong password requirements (12+ characters, complexity)
Secure password hashing (bcrypt with high cost)
Account lockout after failed attempts
Role-based access control implemented
Session management with timeouts
Data Protection
Encrypt sensitive data at rest
Use HTTPS/TLS for all connections
Implement data anonymization/deletion
Handle personal data compliance (GDPR)
🗄️ Database Security
Connection Security
SSL/TLS enabled for database connections
Dedicated database user with minimal permissions
Connection pooling properly configured
Query timeouts implemented
Access Control
Database users have minimal required permissions
No shared database accounts
Regular credential rotation
Network access restrictions
📊 Monitoring & Auditing
Audit Logging
All sensitive operations logged
Audit logs tamper-proof
Regular audit log review
Long-term audit log retention
Security Monitoring
Failed login attempt tracking
Privilege escalation detection
Suspicious query pattern detection
Real-time security alerts
🔧 Development Security
Code Security
Security code reviews
Dependency vulnerability scanning
Secrets management (no hardcoded credentials)
Regular security testing
Environment Security
Separate environments (dev/staging/prod)
Production data not used in development
Environment variable security
Regular security updates
🚀 Advanced Security Patterns
🔐 Zero-Trust Data Access
# Implement zero-trust principle for data access
module ZeroTrustAccess
def self.verify_access(user : User, resource, action : String) : Bool
# Always verify permissions, even for "trusted" users
return false unless user.active?
return false if user.account_locked?
# Check specific permission for action
permission = map_action_to_permission(action)
return false unless user.can?(permission)
# Additional context-based checks
case resource
when User
# Users can only access their own data unless admin
resource.id == user.id || user.admin?
when Post
# Users can access their posts or public posts
resource.user_id == user.id || resource.public? || user.can?(Permission::ReadAllPosts)
else
user.admin? # Conservative default
end
end
private def self.map_action_to_permission(action : String) : Permission
case action
when "read"
Permission::ReadUsers
when "write", "update"
Permission::WriteUsers
when "delete"
Permission::DeleteUsers
else
Permission::AdminAccess
end
end
end
🕵️ Threat Detection
# Advanced threat detection patterns
module ThreatDetection
extend self
def analyze_request_pattern(user : User, action : String, ip : String)
# Check for rate limiting violations
recent_actions = AuditLog.where(user_id: user.id)
.where("created_at > ?", 1.minute.ago)
.count
if recent_actions > 100 # Adjust threshold as needed
SecurityMonitor.log_suspicious_activity("rate_limit_exceeded", {
"user_id" => user.id.to_s,
"action_count" => recent_actions.to_s,
"timeframe" => "1_minute"
}, user)
return false
end
# Check for unusual access patterns
if unusual_access_pattern?(user, action, ip)
SecurityMonitor.log_suspicious_activity("unusual_access_pattern", {
"user_id" => user.id.to_s,
"action" => action,
"ip" => ip
}, user)
end
true
end
private def unusual_access_pattern?(user : User, action : String, ip : String) : Bool
# Check if IP is from different geographical location
# Check if action is unusual for this user
# Check if access time is unusual
false # Implement based on your threat model
end
end
🔐 Security is not a feature, it's a foundation - Implement security measures from the beginning of your project, not as an afterthought. Regular security reviews and updates are essential for maintaining protection.
Next Steps:
Performance Guide → - Secure performance patterns
Testing Guide → - Test your security measures
Best Practices → - Secure development practices
Last updated
Was this helpful?