CI/CD for Data Engineering

Automate testing, deployment, and quality assurance for data pipelines

CI/CD for dbt Projects

GitHub Actions Workflow

# .github/workflows/dbt-ci.yml name: dbt CI on: pull_request: branches: [main] push: branches: [main] jobs: dbt-test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: | pip install dbt-snowflake==1.7.0 dbt deps - name: Run dbt debug run: dbt debug --profiles-dir . env: DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }} DBT_SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }} DBT_SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }} - name: Run dbt compile run: dbt compile --profiles-dir . - name: Run dbt tests run: dbt test --profiles-dir . - name: Run dbt build (slim CI) run: | dbt build --select state:modified+ --state ./target if: github.event_name == 'pull_request'

Slim CI with dbt

Only test modified models and their downstream dependencies:

# Run only changed models dbt build --select state:modified+ # Compare against production manifest dbt build --select state:modified+ --defer --state ./prod-manifest/
Benefits of Slim CI:
  • Faster PR validation (only test what changed)
  • Lower compute costs
  • Quicker feedback loops

Data Pipeline Testing Strategies

Unit Tests

Test individual transformations in isolation

-- tests/unit/test_clean_email.sql WITH test_data AS ( SELECT 'John@EXAMPLE.COM' AS email UNION ALL SELECT ' jane@test.com ' AS email ), expected AS ( SELECT 'john@example.com' AS expected_email UNION ALL SELECT 'jane@test.com' AS expected_email ), actual AS ( SELECT LOWER(TRIM(email)) AS cleaned_email FROM test_data ) SELECT * FROM actual WHERE cleaned_email NOT IN (SELECT expected_email FROM expected)

Integration Tests

Test end-to-end data flows

# Python integration test with pytest def test_orders_pipeline(): # Load test data load_test_data('raw_orders') # Run dbt models run_dbt(['run', '--select', 'fct_orders']) # Assert results result = query_warehouse('SELECT COUNT(*) FROM fct_orders') assert result == expected_count # Check data quality quality_check = query_warehouse(''' SELECT COUNT(*) FROM fct_orders WHERE order_total < 0 ''') assert quality_check == 0

Schema Tests

version: 2 models: - name: fct_orders tests: - dbt_utils.equal_rowcount: compare_model: ref('stg_orders') - dbt_utils.recency: datepart: day field: order_date interval: 1 columns: - name: order_id tests: - unique - not_null - relationships: to: ref('stg_orders') field: order_id

GitOps for Data Platforms

Infrastructure as Code with Terraform

# main.tf - Snowflake resources resource "snowflake_database" "analytics" { name = "ANALYTICS" comment = "Analytics database managed by Terraform" } resource "snowflake_schema" "staging" { database = snowflake_database.analytics.name name = "STAGING" comment = "Staging schema for raw data" } resource "snowflake_warehouse" "dbt" { name = "DBT_WH" warehouse_size = "SMALL" auto_suspend = 60 auto_resume = true }

GitOps Workflow

  1. All infrastructure defined in Git
  2. Changes go through PR review
  3. CI validates Terraform plans
  4. Merge to main triggers deployment
  5. Automated rollback on failures
# .github/workflows/terraform.yml name: Terraform on: pull_request: paths: - 'terraform/**' jobs: terraform: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Terraform Init run: terraform init - name: Terraform Plan run: terraform plan -out=tfplan - name: Terraform Apply if: github.ref == 'refs/heads/main' run: terraform apply -auto-approve tfplan

Automated Quality Checks

Pre-commit Hooks

# .pre-commit-config.yaml repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - id: check-added-large-files - repo: https://github.com/sqlfluff/sqlfluff rev: 2.3.5 hooks: - id: sqlfluff-lint args: [--dialect, snowflake] - id: sqlfluff-fix - repo: https://github.com/psf/black rev: 23.12.0 hooks: - id: black

Data Quality Monitoring

-- Create data quality monitoring table CREATE TABLE dq_monitoring.test_results ( test_id VARCHAR, test_name VARCHAR, table_name VARCHAR, test_status VARCHAR, rows_failed INT, execution_time TIMESTAMP, severity VARCHAR ); -- Example quality check INSERT INTO dq_monitoring.test_results SELECT 'null_check_001' AS test_id, 'Check for null emails' AS test_name, 'dim_customers' AS table_name, CASE WHEN COUNT(*) = 0 THEN 'PASS' ELSE 'FAIL' END AS test_status, COUNT(*) AS rows_failed, CURRENT_TIMESTAMP() AS execution_time, 'HIGH' AS severity FROM dim_customers WHERE email IS NULL;

Deployment Strategies

Blue-Green Deployment

Maintain two identical environments, switch traffic when ready

-- Create blue and green schemas CREATE SCHEMA analytics_blue; CREATE SCHEMA analytics_green; -- Deploy to green while blue is live USE SCHEMA analytics_green; dbt run --full-refresh -- Run tests on green dbt test --schema analytics_green -- Switch traffic (swap schemas) ALTER SCHEMA analytics SWAP WITH analytics_green; -- Green is now live, blue becomes staging

Canary Deployment

Gradually roll out changes to subset of users/data

-- Process 10% of data with new logic WITH sample AS ( SELECT * FROM events WHERE MOD(ABS(HASH(user_id)), 10) = 0 -- 10% sample ) SELECT * FROM sample;

Feature Flags for Data

-- Use config table for feature flags CREATE TABLE feature_flags ( flag_name VARCHAR, is_enabled BOOLEAN, updated_at TIMESTAMP ); -- In dbt model {% set use_new_logic = run_query( "SELECT is_enabled FROM feature_flags WHERE flag_name = 'new_revenue_calc'" ).columns[0][0] %} {% if use_new_logic %} {{ new_revenue_calculation() }} {% else %} {{ old_revenue_calculation() }} {% endif %}

Complete CI/CD Pipeline Example

name: Production Deployment on: push: branches: [main] jobs: deploy-production: runs-on: ubuntu-latest environment: production steps: - uses: actions/checkout@v3 - name: Install dbt run: pip install dbt-snowflake - name: Run dbt seed run: dbt seed --profiles-dir . - name: Run dbt run run: dbt run --profiles-dir . - name: Run dbt test run: dbt test --profiles-dir . - name: Generate dbt docs run: dbt docs generate --profiles-dir . - name: Deploy docs to S3 run: | aws s3 sync ./target s3://dbt-docs-bucket/ - name: Run data quality checks run: python scripts/quality_checks.py - name: Send Slack notification if: always() uses: slackapi/slack-github-action@v1 with: payload: | { "text": "dbt deployment: ${{ job.status }}" } env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}