Data Migration Generator
Generate CQL data migrations for transforming and populating data independently from schema migrations.
Synopsis
azu generate data_migration <name>Description
Data migrations are specialized scripts for transforming existing data, seeding databases, or performing one-time data operations. Unlike schema migrations that modify database structure, data migrations work with the data itself—updating records, migrating data between tables, importing external data, or performing complex transformations.
Key Differences: Schema vs. Data Migrations
Purpose
Modify database structure
Transform existing data
Examples
Add columns, create tables
Update records, import data
Reversibility
Usually reversible
Often irreversible
When to Run
On deploy
On-demand or scheduled
Dependencies
Database schema
Application models
Location
db/migrations/
db/data_migrations/
Usage
Basic Usage
Generate a data migration:
azu generate data_migration migrate_user_rolesThis creates:
src/db/data_migrations/20240101120000_data_migrate_user_roles.crCommon Scenarios
Backfill New Column
azu generate data_migration backfill_user_slugsMigrate Data Between Tables
azu generate data_migration migrate_posts_to_articlesImport External Data
azu generate data_migration import_legacy_usersTransform Existing Data
azu generate data_migration normalize_email_addressesArchive Old Data
azu generate data_migration archive_old_ordersGenerated File Structure
File Location
src/db/data_migrations/
└── 20240101120000_data_migrate_user_roles.crFile Content
# Data migration: Migrate User Roles
# Created: 2024-01-01 12:00:00 UTC
#
# Purpose: [Describe what this data migration does]
#
# Usage:
# crystal run src/db/data_migrations/20240101120000_data_migrate_user_roles.cr
require "../../src/models/**"
require "cql"
module DataMigrations
class MigrateUserRoles
# Perform the data migration
def self.up
puts "Starting data migration: MigrateUserRoles"
# Your data migration logic here
# Example:
# User.all.each do |user|
# # Transform or update user data
# end
puts "Data migration completed successfully"
end
# Rollback the data migration (if possible)
def self.down
puts "Rolling back data migration: MigrateUserRoles"
# Rollback logic here (if applicable)
# Note: Many data migrations cannot be reversed
puts "Data migration rolled back successfully"
end
# Helper method to run migration
def self.run
up
end
end
end
# Run the migration
DataMigrations::MigrateUserRoles.runCommon Data Migration Patterns
1. Backfill Column Values
When you add a new column and need to populate it with calculated or default values:
# src/db/data_migrations/20240101120000_data_backfill_user_slugs.cr
require "../../src/models/**"
module DataMigrations
class BackfillUserSlugs
def self.up
puts "Backfilling user slugs..."
count = 0
User.where("slug IS NULL OR slug = ''").each do |user|
slug = generate_slug(user.name, user.id)
user.update(slug: slug)
count += 1
print "." if count % 100 == 0
end
puts "\nBackfilled #{count} user slugs"
end
private def self.generate_slug(name : String, id : Int64) : String
base_slug = name.downcase.gsub(/[^a-z0-9]+/, "-")
"#{base_slug}-#{id}"
end
def self.run
up
end
end
end
DataMigrations::BackfillUserSlugs.run2. Migrate Data Between Tables
When restructuring your schema and moving data:
# src/db/data_migrations/20240101120000_data_migrate_posts_to_articles.cr
require "../../src/models/**"
module DataMigrations
class MigratePostsToArticles
def self.up
puts "Migrating posts to articles..."
migrated = 0
skipped = 0
Post.all.each do |post|
# Check if already migrated
if Article.exists?(legacy_post_id: post.id)
skipped += 1
next
end
# Create article from post
article = Article.create!(
title: post.title,
content: post.body,
author_id: post.user_id,
published_at: post.created_at,
status: map_status(post.status),
legacy_post_id: post.id
)
# Migrate comments
migrate_comments(post, article)
# Migrate tags
migrate_tags(post, article)
migrated += 1
print "." if migrated % 50 == 0
end
puts "\nMigrated: #{migrated}, Skipped: #{skipped}"
end
private def self.map_status(old_status : String) : String
case old_status
when "published" then "live"
when "draft" then "draft"
when "archived" then "archived"
else "draft"
end
end
private def self.migrate_comments(post : Post, article : Article)
post.comments.each do |comment|
Comment.create!(
article_id: article.id,
user_id: comment.user_id,
content: comment.body,
created_at: comment.created_at
)
end
end
private def self.migrate_tags(post : Post, article : Article)
post.tags.each do |tag|
article.add_tag(tag)
end
end
def self.run
up
end
end
end
DataMigrations::MigratePostsToArticles.run3. Import External Data
When importing data from CSV, JSON, or external APIs:
# src/db/data_migrations/20240101120000_data_import_legacy_users.cr
require "../../src/models/**"
require "csv"
module DataMigrations
class ImportLegacyUsers
def self.up
puts "Importing legacy users..."
csv_path = "db/data/legacy_users.csv"
unless File.exists?(csv_path)
puts "Error: #{csv_path} not found"
return
end
imported = 0
failed = 0
CSV.each_row(File.read(csv_path), headers: true) do |row|
begin
user = User.create!(
email: row["email"].downcase.strip,
name: row["full_name"],
password_hash: row["password_hash"],
created_at: Time.parse(row["created_at"], "%Y-%m-%d %H:%M:%S"),
legacy_id: row["id"].to_i,
confirmed_at: Time.utc # Auto-confirm imported users
)
# Import associated data
import_user_profile(user, row)
import_user_preferences(user, row)
imported += 1
print "." if imported % 100 == 0
rescue ex
puts "\nFailed to import user #{row["email"]}: #{ex.message}"
failed += 1
end
end
puts "\nImported: #{imported}, Failed: #{failed}"
end
private def self.import_user_profile(user : User, row : CSV::Row)
UserProfile.create!(
user_id: user.id,
bio: row["bio"]?,
location: row["location"]?,
website: row["website"]?
)
end
private def self.import_user_preferences(user : User, row : CSV::Row)
UserPreferences.create!(
user_id: user.id,
email_notifications: row["email_notifications"] == "1",
theme: row["theme"] || "light"
)
end
def self.run
up
end
end
end
DataMigrations::ImportLegacyUsers.run4. Transform and Clean Data
When fixing data inconsistencies or normalizing values:
# src/db/data_migrations/20240101120000_data_normalize_email_addresses.cr
require "../../src/models/**"
module DataMigrations
class NormalizeEmailAddresses
def self.up
puts "Normalizing email addresses..."
normalized = 0
duplicates = 0
User.all.each do |user|
original_email = user.email
normalized_email = normalize_email(original_email)
if normalized_email != original_email
# Check for duplicates
if User.exists?(email: normalized_email) && user.email != normalized_email
puts "\nDuplicate email detected: #{normalized_email}"
handle_duplicate(user)
duplicates += 1
else
user.update(email: normalized_email)
normalized += 1
end
end
print "." if (normalized + duplicates) % 100 == 0
end
puts "\nNormalized: #{normalized}, Duplicates: #{duplicates}"
end
private def self.normalize_email(email : String) : String
email.downcase.strip
end
private def self.handle_duplicate(user : User)
# Append user ID to make email unique
unique_email = "#{user.email.split("@").first}_#{user.id}@#{user.email.split("@").last}"
user.update(email: unique_email)
puts " Renamed to: #{unique_email}"
end
def self.run
up
end
end
end
DataMigrations::NormalizeEmailAddresses.run5. Archive Old Data
When moving old records to archive tables:
# src/db/data_migrations/20240101120000_data_archive_old_orders.cr
require "../../src/models/**"
module DataMigrations
class ArchiveOldOrders
# Archive orders older than 2 years
ARCHIVE_THRESHOLD = 2.years.ago
def self.up
puts "Archiving old orders..."
puts "Archiving orders before: #{ARCHIVE_THRESHOLD}"
archived = 0
Order.where("created_at < ?", ARCHIVE_THRESHOLD).each do |order|
# Create archive record
ArchivedOrder.create!(
original_id: order.id,
customer_id: order.customer_id,
total: order.total,
status: order.status,
items: order.items.to_json,
created_at: order.created_at,
archived_at: Time.utc
)
# Delete original order
order.delete
archived += 1
print "." if archived % 100 == 0
end
puts "\nArchived #{archived} orders"
end
def self.down
puts "Restoring archived orders..."
restored = 0
ArchivedOrder.where("archived_at > ?", ARCHIVE_THRESHOLD).each do |archived|
# Restore original order
Order.create!(
id: archived.original_id,
customer_id: archived.customer_id,
total: archived.total,
status: archived.status,
created_at: archived.created_at
)
# Restore order items from JSON
restore_order_items(archived)
# Delete archive record
archived.delete
restored += 1
end
puts "Restored #{restored} orders"
end
private def self.restore_order_items(archived : ArchivedOrder)
items = JSON.parse(archived.items)
items.as_a.each do |item|
OrderItem.create!(
order_id: archived.original_id,
product_id: item["product_id"].as_i64,
quantity: item["quantity"].as_i,
price: item["price"].as_f
)
end
end
def self.run
up
end
end
end
DataMigrations::ArchiveOldOrders.runRunning Data Migrations
Manual Execution
Run a specific data migration:
crystal run src/db/data_migrations/20240101120000_data_migrate_user_roles.crWith Database Connection
Ensure database is configured:
DATABASE_URL="postgresql://user:pass@localhost/db" \
crystal run src/db/data_migrations/20240101120000_data_migrate_user_roles.crIn Production
# Set production environment
CRYSTAL_ENV=production \
DATABASE_URL="$PRODUCTION_DATABASE_URL" \
crystal run src/db/data_migrations/20240101120000_data_migrate_user_roles.crAs Part of Deployment
Add to deployment script:
#!/bin/bash
# deploy.sh
# Run schema migrations
azu db:migrate
# Run specific data migrations
crystal run src/db/data_migrations/20240101120000_data_backfill_slugs.cr
crystal run src/db/data_migrations/20240102140000_data_normalize_emails.cr
# Restart application
systemctl restart myappBest Practices
1. Make Migrations Idempotent
Data migrations should be safe to run multiple times:
def self.up
User.where("slug IS NULL").each do |user|
# Only update if slug is missing
user.update(slug: generate_slug(user))
end
end2. Use Transactions
Wrap operations in transactions when possible:
def self.up
CQL::DB.transaction do
# All operations
# Will rollback if any fails
end
end3. Batch Processing
Process large datasets in batches:
def self.up
BATCH_SIZE = 1000
offset = 0
loop do
batch = User.limit(BATCH_SIZE).offset(offset).to_a
break if batch.empty?
batch.each do |user|
process_user(user)
end
offset += BATCH_SIZE
puts "Processed #{offset} users..."
end
end4. Progress Reporting
Provide feedback for long-running migrations:
def self.up
total = User.count
processed = 0
User.all.each do |user|
process_user(user)
processed += 1
if processed % 100 == 0
percentage = (processed.to_f / total * 100).round(2)
puts "Progress: #{processed}/#{total} (#{percentage}%)"
end
end
end5. Error Handling
Handle errors gracefully:
def self.up
failed = [] of Int64
User.all.each do |user|
begin
process_user(user)
rescue ex
puts "Failed to process user #{user.id}: #{ex.message}"
failed << user.id
end
end
if failed.any?
puts "\nFailed user IDs: #{failed.join(", ")}"
puts "Total failures: #{failed.size}"
end
end6. Verification
Verify results after migration:
def self.up
# Perform migration
User.where("slug IS NULL").each do |user|
user.update(slug: generate_slug(user))
end
# Verify
remaining = User.where("slug IS NULL OR slug = ''").count
if remaining > 0
puts "Warning: #{remaining} users still have no slug"
else
puts "✓ All users have slugs"
end
endTesting Data Migrations
Create tests for data migrations:
# spec/data_migrations/backfill_user_slugs_spec.cr
require "../spec_helper"
describe DataMigrations::BackfillUserSlugs do
describe ".up" do
it "generates slugs for users without slugs" do
user1 = create_user(name: "John Doe", slug: nil)
user2 = create_user(name: "Jane Smith", slug: nil)
user3 = create_user(name: "Bob Wilson", slug: "existing-slug")
DataMigrations::BackfillUserSlugs.up
user1.reload.slug.should_not be_nil
user2.reload.slug.should_not be_nil
user3.reload.slug.should eq("existing-slug")
end
it "generates unique slugs" do
user1 = create_user(name: "Test User", slug: nil)
user2 = create_user(name: "Test User", slug: nil)
DataMigrations::BackfillUserSlugs.up
user1.reload.slug.should_not eq(user2.reload.slug)
end
end
endTracking Data Migrations
Migration Log
Create a table to track data migrations:
CREATE TABLE data_migrations (
id BIGSERIAL PRIMARY KEY,
version VARCHAR(255) NOT NULL UNIQUE,
name VARCHAR(255) NOT NULL,
executed_at TIMESTAMP NOT NULL DEFAULT NOW()
);Record Execution
def self.up
# Check if already run
if executed?
puts "Migration already executed, skipping..."
return
end
# Perform migration
# ...
# Record execution
record_execution
end
private def self.executed? : Bool
CQL::DB.scalar("SELECT COUNT(*) FROM data_migrations WHERE version = ?", version).as(Int64) > 0
end
private def self.record_execution
CQL::DB.exec(
"INSERT INTO data_migrations (version, name) VALUES (?, ?)",
version,
name
)
end
private def self.version : String
"20240101120000"
end
private def self.name : String
"BackfillUserSlugs"
endTroubleshooting
Migration Fails Midway
Problem: Large migration fails after processing many records
Solutions:
Implement checkpoints
Use batching with offset tracking
Make migrations resumable
def self.up
last_processed_id = get_last_checkpoint || 0
User.where("id > ?", last_processed_id).each do |user|
process_user(user)
save_checkpoint(user.id)
end
endOut of Memory
Problem: Processing too many records at once
Solutions:
Use
find_eachor batchingProcess in smaller chunks
Clear object caches
# Bad
users = User.all # Loads all into memory
# Good
User.find_each(batch_size: 100) do |user|
process_user(user)
endSlow Performance
Problem: Migration takes too long
Solutions:
Add database indexes
Use bulk operations
Optimize queries
Process in parallel (if safe)
# Bulk update instead of individual saves
User.where(active: true).update_all(status: "active")Related Documentation
See Also
azu generate migration- Schema migrationsazu db:migrate- Run schema migrations
Last updated