AI/ML for Data Engineering

Integrate artificial intelligence and machine learning into your data workflows

LLM Integration Patterns

Using LLMs for Data Quality

import openai def check_data_anomalies(data_summary): """Use LLM to identify potential data quality issues""" prompt = f""" Analyze this data summary and identify potential quality issues: {data_summary} Look for: - Unexpected patterns - Missing values - Outliers - Data inconsistencies """ response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": "You are a data quality expert."}, {"role": "user", "content": prompt} ] ) return response.choices[0].message.content

SQL Generation with LLMs

def generate_sql(natural_language_query, schema): """Convert natural language to SQL""" prompt = f""" Given this database schema: {schema} Generate SQL for: {natural_language_query} Return only the SQL query, no explanation. """ response = openai.ChatCompletion.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0 # Deterministic output ) return response.choices[0].message.content # Example usage schema = """ customers (id, name, email, created_at) orders (id, customer_id, amount, order_date) """ query = generate_sql( "Show me total revenue by month for 2026", schema )

RAG (Retrieval-Augmented Generation)

Building a Data Documentation RAG System

from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.llms import OpenAI from langchain.chains import RetrievalQA # 1. Load your documentation docs = load_dbt_documentation() # Your docs # 2. Create embeddings embeddings = OpenAIEmbeddings() vectorstore = Chroma.from_documents(docs, embeddings) # 3. Create retrieval chain qa_chain = RetrievalQA.from_chain_type( llm=OpenAI(temperature=0), chain_type="stuff", retriever=vectorstore.as_retriever(search_kwargs={"k": 3}) ) # 4. Query your docs answer = qa_chain.run( "How do I calculate customer lifetime value in our dbt models?" )

RAG for Data Lineage

def query_data_lineage(question): """RAG system for querying data lineage""" # Embed lineage metadata lineage_docs = [ "fct_orders depends on stg_orders and dim_customers", "stg_orders sources from raw.orders table", "dim_customers is a Type 2 SCD table" ] # Create vector store vectorstore = Chroma.from_texts( lineage_docs, OpenAIEmbeddings() ) # Query docs = vectorstore.similarity_search(question, k=2) # Generate answer with context context = "\n".join([doc.page_content for doc in docs]) response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": f"Context: {context}"}, {"role": "user", "content": question} ] ) return response.choices[0].message.content

Vector Databases for Analytics

Comparison of Vector Databases

Pinecone: Fully managed, fast, best for production
Weaviate: Open-source, GraphQL API, good for hybrid search
Chroma: Lightweight, easy to embed, great for prototyping
Qdrant: High performance, rich filtering, Rust-based
Milvus: Scalable, enterprise-ready, supports multiple indexes

Semantic Search on Data Catalog

import pinecone from sentence_transformers import SentenceTransformer # Initialize pinecone.init(api_key="your-key") index = pinecone.Index("data-catalog") model = SentenceTransformer('all-MiniLM-L6-v2') # Index your table metadata tables = [ {"name": "fct_orders", "description": "Daily order facts with revenue"}, {"name": "dim_customers", "description": "Customer dimension with SCD2"}, ] for table in tables: embedding = model.encode(table["description"]) index.upsert([( table["name"], embedding.tolist(), {"description": table["description"]} )]) # Semantic search query = "Where can I find customer purchase history?" query_embedding = model.encode(query) results = index.query(query_embedding.tolist(), top_k=3) print("Relevant tables:", [r.id for r in results.matches])

ML Model Deployment in Data Pipelines

Deploying Models with MLflow

import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier # Train model model = RandomForestClassifier() model.fit(X_train, y_train) # Log with MLflow with mlflow.start_run(): mlflow.log_param("n_estimators", 100) mlflow.log_metric("accuracy", accuracy) # Log model mlflow.sklearn.log_model( model, "model", registered_model_name="churn_predictor" ) # Load in production pipeline model_uri = "models:/churn_predictor/production" loaded_model = mlflow.sklearn.load_model(model_uri) # Use in Airflow DAG def predict_churn(**context): model = mlflow.sklearn.load_model(model_uri) predictions = model.predict(new_data) load_to_warehouse(predictions)

Real-time ML Predictions in dbt

-- models/ml/customer_churn_scores.sql {{ config(materialized='table') }} WITH customer_features AS ( SELECT customer_id, days_since_last_order, total_lifetime_value, order_frequency FROM {{ ref('fct_customer_metrics') }} ), predictions AS ( SELECT customer_id, {{ ml_predict( 'churn_model_v1', ['days_since_last_order', 'total_lifetime_value', 'order_frequency'] ) }} AS churn_probability FROM customer_features ) SELECT * FROM predictions

ML Monitoring & Observability

Data Drift Detection

from evidently.dashboard import Dashboard from evidently.tabs import DataDriftTab # Compare current data to reference dashboard = Dashboard(tabs=[DataDriftTab()]) dashboard.calculate( reference_data=historical_data, current_data=recent_data, column_mapping=column_mapping ) # Alert on drift if dashboard.get_drift_score() > 0.3: send_alert("Data drift detected!") trigger_model_retraining()

Model Performance Tracking

CREATE TABLE ml_monitoring.predictions ( prediction_id VARCHAR, model_name VARCHAR, model_version VARCHAR, prediction_value FLOAT, prediction_timestamp TIMESTAMP, actual_value FLOAT, -- Filled later feature_values VARIANT ); -- Track prediction accuracy over time CREATE VIEW ml_monitoring.model_performance AS SELECT model_name, model_version, DATE_TRUNC('day', prediction_timestamp) as date, COUNT(*) as prediction_count, AVG(ABS(prediction_value - actual_value)) as mae, CORR(prediction_value, actual_value) as correlation FROM ml_monitoring.predictions WHERE actual_value IS NOT NULL GROUP BY 1, 2, 3;

Feature Store Integration

from feast import FeatureStore store = FeatureStore(repo_path=".") # Define features @feature_view( entities=["customer"], ttl=timedelta(days=1), batch_source=snowflake_source ) def customer_features(): return [ Feature("total_orders", ValueType.INT64), Feature("avg_order_value", ValueType.FLOAT), Feature("days_since_last_order", ValueType.INT64) ] # Get features for inference entity_rows = [{"customer_id": 123}, {"customer_id": 456}] features = store.get_online_features( features=[ "customer_features:total_orders", "customer_features:avg_order_value" ], entity_rows=entity_rows ).to_df()

AI-Powered Data Engineering

Automated Schema Inference

def infer_schema_with_llm(sample_data): """Use LLM to suggest optimal schema""" prompt = f""" Given this sample data: {sample_data.head(10).to_string()} Suggest: 1. Optimal data types for each column 2. Primary keys 3. Foreign key relationships 4. Partitioning strategy Format as SQL CREATE TABLE statement. """ response = openai.ChatCompletion.create( model="gpt-4", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content

AI-Powered Data Transformation

def suggest_transformations(data_profile): """Suggest dbt transformations based on data profile""" prompt = f""" Data profile: - Null rate: {data_profile['null_rate']} - Duplicates: {data_profile['duplicates']} - Data types: {data_profile['types']} Suggest dbt model transformations to: 1. Handle nulls 2. Remove duplicates 3. Standardize formats Provide dbt SQL code. """ return get_llm_response(prompt)