CI/CD for Data Engineering
Automate testing, deployment, and quality assurance for data pipelines
CI/CD for dbt Projects
GitHub Actions Workflow
# .github/workflows/dbt-ci.yml
name: dbt CI
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
dbt-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install dbt-snowflake==1.7.0
dbt deps
- name: Run dbt debug
run: dbt debug --profiles-dir .
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
- name: Run dbt compile
run: dbt compile --profiles-dir .
- name: Run dbt tests
run: dbt test --profiles-dir .
- name: Run dbt build (slim CI)
run: |
dbt build --select state:modified+ --state ./target
if: github.event_name == 'pull_request'
Slim CI with dbt
Only test modified models and their downstream dependencies:
# Run only changed models
dbt build --select state:modified+
# Compare against production manifest
dbt build --select state:modified+ --defer --state ./prod-manifest/
Benefits of Slim CI:
- Faster PR validation (only test what changed)
- Lower compute costs
- Quicker feedback loops
Data Pipeline Testing Strategies
Unit Tests
Test individual transformations in isolation
-- tests/unit/test_clean_email.sql
WITH test_data AS (
SELECT 'John@EXAMPLE.COM' AS email
UNION ALL
SELECT ' jane@test.com ' AS email
),
expected AS (
SELECT 'john@example.com' AS expected_email
UNION ALL
SELECT 'jane@test.com' AS expected_email
),
actual AS (
SELECT LOWER(TRIM(email)) AS cleaned_email
FROM test_data
)
SELECT * FROM actual
WHERE cleaned_email NOT IN (SELECT expected_email FROM expected)
Integration Tests
Test end-to-end data flows
# Python integration test with pytest
def test_orders_pipeline():
# Load test data
load_test_data('raw_orders')
# Run dbt models
run_dbt(['run', '--select', 'fct_orders'])
# Assert results
result = query_warehouse('SELECT COUNT(*) FROM fct_orders')
assert result == expected_count
# Check data quality
quality_check = query_warehouse('''
SELECT COUNT(*) FROM fct_orders
WHERE order_total < 0
''')
assert quality_check == 0
Schema Tests
version: 2
models:
- name: fct_orders
tests:
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders')
- dbt_utils.recency:
datepart: day
field: order_date
interval: 1
columns:
- name: order_id
tests:
- unique
- not_null
- relationships:
to: ref('stg_orders')
field: order_id
GitOps for Data Platforms
Infrastructure as Code with Terraform
# main.tf - Snowflake resources
resource "snowflake_database" "analytics" {
name = "ANALYTICS"
comment = "Analytics database managed by Terraform"
}
resource "snowflake_schema" "staging" {
database = snowflake_database.analytics.name
name = "STAGING"
comment = "Staging schema for raw data"
}
resource "snowflake_warehouse" "dbt" {
name = "DBT_WH"
warehouse_size = "SMALL"
auto_suspend = 60
auto_resume = true
}
GitOps Workflow
- All infrastructure defined in Git
- Changes go through PR review
- CI validates Terraform plans
- Merge to main triggers deployment
- Automated rollback on failures
# .github/workflows/terraform.yml
name: Terraform
on:
pull_request:
paths:
- 'terraform/**'
jobs:
terraform:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Terraform Init
run: terraform init
- name: Terraform Plan
run: terraform plan -out=tfplan
- name: Terraform Apply
if: github.ref == 'refs/heads/main'
run: terraform apply -auto-approve tfplan
Automated Quality Checks
Pre-commit Hooks
# .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/sqlfluff/sqlfluff
rev: 2.3.5
hooks:
- id: sqlfluff-lint
args: [--dialect, snowflake]
- id: sqlfluff-fix
- repo: https://github.com/psf/black
rev: 23.12.0
hooks:
- id: black
Data Quality Monitoring
-- Create data quality monitoring table
CREATE TABLE dq_monitoring.test_results (
test_id VARCHAR,
test_name VARCHAR,
table_name VARCHAR,
test_status VARCHAR,
rows_failed INT,
execution_time TIMESTAMP,
severity VARCHAR
);
-- Example quality check
INSERT INTO dq_monitoring.test_results
SELECT
'null_check_001' AS test_id,
'Check for null emails' AS test_name,
'dim_customers' AS table_name,
CASE
WHEN COUNT(*) = 0 THEN 'PASS'
ELSE 'FAIL'
END AS test_status,
COUNT(*) AS rows_failed,
CURRENT_TIMESTAMP() AS execution_time,
'HIGH' AS severity
FROM dim_customers
WHERE email IS NULL;
Deployment Strategies
Blue-Green Deployment
Maintain two identical environments, switch traffic when ready
-- Create blue and green schemas
CREATE SCHEMA analytics_blue;
CREATE SCHEMA analytics_green;
-- Deploy to green while blue is live
USE SCHEMA analytics_green;
dbt run --full-refresh
-- Run tests on green
dbt test --schema analytics_green
-- Switch traffic (swap schemas)
ALTER SCHEMA analytics SWAP WITH analytics_green;
-- Green is now live, blue becomes staging
Canary Deployment
Gradually roll out changes to subset of users/data
-- Process 10% of data with new logic
WITH sample AS (
SELECT * FROM events
WHERE MOD(ABS(HASH(user_id)), 10) = 0 -- 10% sample
)
SELECT * FROM sample;
Feature Flags for Data
-- Use config table for feature flags
CREATE TABLE feature_flags (
flag_name VARCHAR,
is_enabled BOOLEAN,
updated_at TIMESTAMP
);
-- In dbt model
{% set use_new_logic = run_query(
"SELECT is_enabled FROM feature_flags WHERE flag_name = 'new_revenue_calc'"
).columns[0][0] %}
{% if use_new_logic %}
{{ new_revenue_calculation() }}
{% else %}
{{ old_revenue_calculation() }}
{% endif %}
Complete CI/CD Pipeline Example
name: Production Deployment
on:
push:
branches: [main]
jobs:
deploy-production:
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v3
- name: Install dbt
run: pip install dbt-snowflake
- name: Run dbt seed
run: dbt seed --profiles-dir .
- name: Run dbt run
run: dbt run --profiles-dir .
- name: Run dbt test
run: dbt test --profiles-dir .
- name: Generate dbt docs
run: dbt docs generate --profiles-dir .
- name: Deploy docs to S3
run: |
aws s3 sync ./target s3://dbt-docs-bucket/
- name: Run data quality checks
run: python scripts/quality_checks.py
- name: Send Slack notification
if: always()
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "dbt deployment: ${{ job.status }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}