AI/ML for Data Engineering
Integrate artificial intelligence and machine learning into your data workflows
LLM Integration Patterns
Using LLMs for Data Quality
import openai
def check_data_anomalies(data_summary):
"""Use LLM to identify potential data quality issues"""
prompt = f"""
Analyze this data summary and identify potential quality issues:
{data_summary}
Look for:
- Unexpected patterns
- Missing values
- Outliers
- Data inconsistencies
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a data quality expert."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
SQL Generation with LLMs
def generate_sql(natural_language_query, schema):
"""Convert natural language to SQL"""
prompt = f"""
Given this database schema:
{schema}
Generate SQL for: {natural_language_query}
Return only the SQL query, no explanation.
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0 # Deterministic output
)
return response.choices[0].message.content
# Example usage
schema = """
customers (id, name, email, created_at)
orders (id, customer_id, amount, order_date)
"""
query = generate_sql(
"Show me total revenue by month for 2026",
schema
)
RAG (Retrieval-Augmented Generation)
Building a Data Documentation RAG System
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
# 1. Load your documentation
docs = load_dbt_documentation() # Your docs
# 2. Create embeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(docs, embeddings)
# 3. Create retrieval chain
qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(temperature=0),
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)
# 4. Query your docs
answer = qa_chain.run(
"How do I calculate customer lifetime value in our dbt models?"
)
RAG for Data Lineage
def query_data_lineage(question):
"""RAG system for querying data lineage"""
# Embed lineage metadata
lineage_docs = [
"fct_orders depends on stg_orders and dim_customers",
"stg_orders sources from raw.orders table",
"dim_customers is a Type 2 SCD table"
]
# Create vector store
vectorstore = Chroma.from_texts(
lineage_docs,
OpenAIEmbeddings()
)
# Query
docs = vectorstore.similarity_search(question, k=2)
# Generate answer with context
context = "\n".join([doc.page_content for doc in docs])
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"Context: {context}"},
{"role": "user", "content": question}
]
)
return response.choices[0].message.content
Vector Databases for Analytics
Comparison of Vector Databases
Pinecone: Fully managed, fast, best for production
Weaviate: Open-source, GraphQL API, good for hybrid search
Chroma: Lightweight, easy to embed, great for prototyping
Qdrant: High performance, rich filtering, Rust-based
Milvus: Scalable, enterprise-ready, supports multiple indexes
Weaviate: Open-source, GraphQL API, good for hybrid search
Chroma: Lightweight, easy to embed, great for prototyping
Qdrant: High performance, rich filtering, Rust-based
Milvus: Scalable, enterprise-ready, supports multiple indexes
Semantic Search on Data Catalog
import pinecone
from sentence_transformers import SentenceTransformer
# Initialize
pinecone.init(api_key="your-key")
index = pinecone.Index("data-catalog")
model = SentenceTransformer('all-MiniLM-L6-v2')
# Index your table metadata
tables = [
{"name": "fct_orders", "description": "Daily order facts with revenue"},
{"name": "dim_customers", "description": "Customer dimension with SCD2"},
]
for table in tables:
embedding = model.encode(table["description"])
index.upsert([(
table["name"],
embedding.tolist(),
{"description": table["description"]}
)])
# Semantic search
query = "Where can I find customer purchase history?"
query_embedding = model.encode(query)
results = index.query(query_embedding.tolist(), top_k=3)
print("Relevant tables:", [r.id for r in results.matches])
ML Model Deployment in Data Pipelines
Deploying Models with MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
# Train model
model = RandomForestClassifier()
model.fit(X_train, y_train)
# Log with MLflow
with mlflow.start_run():
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="churn_predictor"
)
# Load in production pipeline
model_uri = "models:/churn_predictor/production"
loaded_model = mlflow.sklearn.load_model(model_uri)
# Use in Airflow DAG
def predict_churn(**context):
model = mlflow.sklearn.load_model(model_uri)
predictions = model.predict(new_data)
load_to_warehouse(predictions)
Real-time ML Predictions in dbt
-- models/ml/customer_churn_scores.sql
{{ config(materialized='table') }}
WITH customer_features AS (
SELECT
customer_id,
days_since_last_order,
total_lifetime_value,
order_frequency
FROM {{ ref('fct_customer_metrics') }}
),
predictions AS (
SELECT
customer_id,
{{ ml_predict(
'churn_model_v1',
['days_since_last_order', 'total_lifetime_value', 'order_frequency']
) }} AS churn_probability
FROM customer_features
)
SELECT * FROM predictions
ML Monitoring & Observability
Data Drift Detection
from evidently.dashboard import Dashboard
from evidently.tabs import DataDriftTab
# Compare current data to reference
dashboard = Dashboard(tabs=[DataDriftTab()])
dashboard.calculate(
reference_data=historical_data,
current_data=recent_data,
column_mapping=column_mapping
)
# Alert on drift
if dashboard.get_drift_score() > 0.3:
send_alert("Data drift detected!")
trigger_model_retraining()
Model Performance Tracking
CREATE TABLE ml_monitoring.predictions (
prediction_id VARCHAR,
model_name VARCHAR,
model_version VARCHAR,
prediction_value FLOAT,
prediction_timestamp TIMESTAMP,
actual_value FLOAT, -- Filled later
feature_values VARIANT
);
-- Track prediction accuracy over time
CREATE VIEW ml_monitoring.model_performance AS
SELECT
model_name,
model_version,
DATE_TRUNC('day', prediction_timestamp) as date,
COUNT(*) as prediction_count,
AVG(ABS(prediction_value - actual_value)) as mae,
CORR(prediction_value, actual_value) as correlation
FROM ml_monitoring.predictions
WHERE actual_value IS NOT NULL
GROUP BY 1, 2, 3;
Feature Store Integration
from feast import FeatureStore
store = FeatureStore(repo_path=".")
# Define features
@feature_view(
entities=["customer"],
ttl=timedelta(days=1),
batch_source=snowflake_source
)
def customer_features():
return [
Feature("total_orders", ValueType.INT64),
Feature("avg_order_value", ValueType.FLOAT),
Feature("days_since_last_order", ValueType.INT64)
]
# Get features for inference
entity_rows = [{"customer_id": 123}, {"customer_id": 456}]
features = store.get_online_features(
features=[
"customer_features:total_orders",
"customer_features:avg_order_value"
],
entity_rows=entity_rows
).to_df()
AI-Powered Data Engineering
Automated Schema Inference
def infer_schema_with_llm(sample_data):
"""Use LLM to suggest optimal schema"""
prompt = f"""
Given this sample data:
{sample_data.head(10).to_string()}
Suggest:
1. Optimal data types for each column
2. Primary keys
3. Foreign key relationships
4. Partitioning strategy
Format as SQL CREATE TABLE statement.
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
AI-Powered Data Transformation
def suggest_transformations(data_profile):
"""Suggest dbt transformations based on data profile"""
prompt = f"""
Data profile:
- Null rate: {data_profile['null_rate']}
- Duplicates: {data_profile['duplicates']}
- Data types: {data_profile['types']}
Suggest dbt model transformations to:
1. Handle nulls
2. Remove duplicates
3. Standardize formats
Provide dbt SQL code.
"""
return get_llm_response(prompt)