Data Engineer Examples
Externalized from the agent definition per the few-shot-examples rule (#1587).
Data Engineer — Worked Examples
Externalized from the agent definition per the few-shot-examples rule (#1587).
Your Process
1. Warehouse Modeling — Star and Snowflake Schema
-- Star schema: fact_orders surrounded by dimension tables
-- Fact table: grain = one row per order line item
CREATE TABLE warehouse.fact_order_lines (
order_line_key BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
order_date_key INT NOT NULL REFERENCES warehouse.dim_date(date_key),
customer_key INT NOT NULL REFERENCES warehouse.dim_customer(customer_key),
product_key INT NOT NULL REFERENCES warehouse.dim_product(product_key),
fulfillment_key INT NOT NULL REFERENCES warehouse.dim_fulfillment(fulfillment_key),
-- Degenerate dimensions (stored on fact, no separate table warranted)
order_id VARCHAR(50) NOT NULL,
order_line_id VARCHAR(50) NOT NULL,
-- Measures
quantity INT NOT NULL,
unit_price_usd NUMERIC(10, 4) NOT NULL,
discount_amount_usd NUMERIC(10, 4) NOT NULL DEFAULT 0,
gross_revenue_usd NUMERIC(10, 4) NOT NULL,
net_revenue_usd NUMERIC(10, 4) NOT NULL,
-- Audit
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
pipeline_run_id VARCHAR(100) NOT NULL
);
-- SCD Type 2 dimension: captures customer attribute history
CREATE TABLE warehouse.dim_customer (
customer_key INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL, -- Natural key from source
email VARCHAR(255),
full_name VARCHAR(255),
country_code CHAR(2),
customer_tier VARCHAR(20), -- 'standard', 'premium', 'enterprise'
-- SCD Type 2 tracking
effective_from DATE NOT NULL,
effective_to DATE, -- NULL means current record
is_current BOOLEAN NOT NULL DEFAULT TRUE,
-- Audit
source_system VARCHAR(50) NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX idx_dim_customer_current
ON warehouse.dim_customer(customer_id)
WHERE is_current = TRUE;
2. dbt Models with Tests and Documentation
-- models/staging/stg_orders.sql
-- Staging: rename, cast, and light cleaning only — no business logic here
{{ config(
materialized = 'view',
tags = ['staging', 'orders']
) }}
WITH source AS (
SELECT * FROM {{ source('raw_ecommerce', 'orders') }}
),
renamed AS (
SELECT
order_id::VARCHAR AS order_id,
customer_id::VARCHAR AS customer_id,
created_at::TIMESTAMP AS ordered_at,
updated_at::TIMESTAMP AS updated_at,
status::VARCHAR AS order_status,
subtotal_cents::BIGINT AS subtotal_cents,
discount_cents::BIGINT AS discount_cents,
total_cents::BIGINT AS total_cents,
currency_code::CHAR(3) AS currency_code,
COALESCE(shipping_country, '') AS shipping_country_code
FROM source
WHERE _fivetran_deleted = FALSE
)
SELECT * FROM renamed
-- models/marts/finance/fct_orders.sql
-- Mart: business-logic-rich model for finance reporting
{{ config(
materialized = 'incremental',
unique_key = 'order_id',
on_schema_change = 'append_new_columns',
tags = ['marts', 'finance', 'daily']
) }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
order_items AS (
SELECT
order_id,
COUNT(*) AS line_item_count,
SUM(quantity) AS total_quantity,
SUM(unit_price_cents * quantity) AS gross_revenue_cents
FROM {{ ref('stg_order_items') }}
GROUP BY order_id
),
customers AS (
SELECT customer_id, customer_tier, country_code
FROM {{ ref('dim_customers') }}
WHERE is_current = TRUE
),
final AS (
SELECT
o.order_id,
o.customer_id,
o.ordered_at,
o.order_status,
c.customer_tier,
c.country_code,
oi.line_item_count,
oi.total_quantity,
ROUND(o.total_cents / fx.rate_to_usd / 100.0, 4) AS total_usd,
ROUND(o.discount_cents / fx.rate_to_usd / 100.0, 4) AS discount_usd,
o.updated_at
FROM orders o
LEFT JOIN order_items oi USING (order_id)
LEFT JOIN customers c USING (customer_id)
LEFT JOIN {{ ref('dim_fx_rates') }} fx
ON fx.currency_code = o.currency_code
AND fx.rate_date = o.ordered_at::DATE
)
SELECT * FROM final
# models/marts/finance/fct_orders.yml
version: 2
models:
- name: fct_orders
description: >
One row per order. Includes revenue figures normalized to USD,
customer tier at time of analysis, and item counts. Incremental
model refreshed daily.
meta:
owner: [email protected]
sla_hours: 6
tier: gold
columns:
- name: order_id
description: Natural key from the ecommerce platform
tests:
- unique
- not_null
- name: order_status
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded']
- name: total_usd
description: Order total in USD, converted using daily exchange rate
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
3. Apache Spark Jobs for Large-Scale Transformation
# spark/jobs/enrich_clickstream.py
# Enriches raw clickstream events with product and session metadata
# spark-submit --deploy-mode cluster jobs/enrich_clickstream.py --date 2026-02-27
from __future__ import annotations
import argparse
import logging
from datetime import date, timedelta
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
logger = logging.getLogger(__name__)
def build_spark_session(app_name: str) -> SparkSession:
return (
SparkSession.builder.appName(app_name)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
)
def read_raw_events(spark: SparkSession, s3_path: str, event_date: date) -> DataFrame:
return spark.read.parquet(f"{s3_path}/dt={event_date.isoformat()}")
def enrich_with_product_data(events: DataFrame, products: DataFrame) -> DataFrame:
"""Left join events to product catalog; broadcast small dimension table."""
products_broadcast = F.broadcast(
products.select("product_id", "category", "brand", "price_usd")
)
return events.join(products_broadcast, on="product_id", how="left")
def compute_session_features(df: DataFrame) -> DataFrame:
"""Compute per-session aggregates using window functions."""
session_window = Window.partitionBy("session_id").orderBy("event_ts")
session_agg = Window.partitionBy("session_id")
return df.withColumns({
"session_event_sequence": F.row_number().over(session_window),
"session_page_views": F.count("*").over(session_agg),
"session_duration_seconds": (
F.max("event_ts").over(session_agg).cast("long")
- F.min("event_ts").over(session_agg).cast("long")
),
"first_page_in_session": F.first("page_url").over(session_window),
})
def write_enriched_events(df: DataFrame, output_path: str, event_date: date) -> None:
output = f"{output_path}/dt={event_date.isoformat()}"
(
df.repartition(200)
.write.mode("overwrite")
.option("compression", "snappy")
.parquet(output)
)
logger.info("Wrote enriched events to %s", output)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--date", default=str(date.today() - timedelta(days=1)))
parser.add_argument("--raw-events-path", default="s3://data-lake/raw/clickstream")
parser.add_argument("--product-catalog-path", default="s3://data-lake/dim/products")
parser.add_argument("--output-path", default="s3://data-lake/enriched/clickstream")
args = parser.parse_args()
event_date = date.fromisoformat(args.date)
spark = build_spark_session(f"enrich-clickstream-{event_date}")
events = read_raw_events(spark, args.raw_events_path, event_date)
products = spark.read.parquet(args.product_catalog_path)
enriched = (
events.transform(lambda df: enrich_with_product_data(df, products))
.transform(compute_session_features)
.filter(F.col("event_ts").isNotNull())
)
write_enriched_events(enriched, args.output_path, event_date)
spark.stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
4. Airflow DAGs with Dependency Management and SLA Monitoring
# dags/ecommerce_daily_pipeline.py
# Full daily ELT pipeline: extract → validate → transform → quality check → catalog
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
DBT_CLOUD_JOB_ID = 12345
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=10),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(hours=1),
"email_on_failure": False,
"on_failure_callback": SlackNotifier(
slack_conn_id="slack_data_alerts",
text="Pipeline {{ dag.dag_id }} failed on {{ ds }}. Task: {{ task.task_id }}",
channel="#data-alerts",
),
}
with DAG(
dag_id="ecommerce_daily_pipeline",
start_date=datetime(2026, 1, 1),
schedule="0 5 * * *", # 5 AM UTC daily
catchup=False,
max_active_runs=1,
tags=["ecommerce", "daily", "tier-1"],
default_args=default_args,
sla_miss_callback=SlackNotifier(
slack_conn_id="slack_data_alerts",
text="SLA missed for {{ dag.dag_id }} on {{ ds }}",
channel="#data-sla",
),
doc_md="""
## Ecommerce Daily Pipeline
Ingests orders, customers, and products from the source database,
transforms via dbt, and publishes to the finance and marketing marts.
**SLA**: All mart tables available by 08:00 UTC.
**Owner**: [email protected]
**Runbook**: https://wiki.company.com/data/runbooks/ecommerce-daily
""",
) as dag:
extract_orders = GlueJobOperator(
task_id="extract_orders",
job_name="ecommerce-extract-orders",
script_args={"--date": "{{ ds }}", "--output-path": "s3://raw/orders/"},
aws_conn_id="aws_default",
wait_for_completion=True,
num_of_dpus=10,
sla=timedelta(hours=1),
)
extract_customers = GlueJobOperator(
task_id="extract_customers",
job_name="ecommerce-extract-customers",
script_args={"--date": "{{ ds }}"},
aws_conn_id="aws_default",
wait_for_completion=True,
num_of_dpus=4,
)
@task()
def validate_raw_data(ds: str) -> dict:
"""Check row counts and freshness before kicking off dbt."""
import boto3
athena = boto3.client("athena", region_name="us-east-1")
exec_id = athena.start_query_execution(
QueryString=f"SELECT COUNT(*) FROM raw.orders WHERE dt = '{ds}'",
ResultConfiguration={"OutputLocation": "s3://athena-results/"},
)["QueryExecutionId"]
count = _poll_athena_count(athena, exec_id)
if count < 100:
raise ValueError(f"Only {count} orders for {ds} — expected >= 100")
return {"order_count": count}
run_dbt = DbtCloudRunJobOperator(
task_id="run_dbt_transformations",
dbt_cloud_conn_id="dbt_cloud",
job_id=DBT_CLOUD_JOB_ID,
trigger_reason="Airflow scheduled run for {{ ds }}",
wait_for_termination=True,
additional_run_config={"threads_override": 8},
sla=timedelta(hours=2),
)
run_dq_checks = BashOperator(
task_id="run_data_quality_checks",
bash_command=(
"dbt test --select tag:finance tag:marketing"
" --profiles-dir /opt/airflow/dbt --target prod"
),
)
@task()
def publish_to_catalog(ds: str) -> None:
"""Push lineage and freshness metadata to Datahub."""
import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
emitter = DatahubRestEmitter("http://datahub-gms:8080")
dataset_urn = builder.make_dataset_urn("bigquery", "mycompany.finance.fct_orders")
freshness = builder.make_data_freshness_aspect(
last_updated=datetime.fromisoformat(ds)
)
emitter.emit_mce(builder.make_lineage_mce(dataset_urn, freshness))
validate = validate_raw_data()
publish = publish_to_catalog()
[extract_orders, extract_customers] >> validate >> run_dbt >> run_dq_checks >> publish
def _poll_athena_count(client, execution_id: str) -> int:
import time
while True:
state = client.get_query_execution(
QueryExecutionId=execution_id
)["QueryExecution"]["Status"]["State"]
if state == "SUCCEEDED":
break
if state in ("FAILED", "CANCELLED"):
raise RuntimeError(f"Athena query {execution_id} {state}")
time.sleep(5)
rows = client.get_query_results(QueryExecutionId=execution_id)["ResultSet"]["Rows"]
return int(rows[1]["Data"][0]["VarCharValue"])
5. Stream Processing with Kafka and Flink
# flink/jobs/order_events_processor.py
# Real-time order event enrichment with 1-minute tumbling window aggregation
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.datastream.connectors.kafka import (
KafkaSource, KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema,
)
from pyflink.common import WatermarkStrategy, Duration
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.window import TumblingEventTimeWindows, Time
import json
from datetime import datetime
def parse_order_event(raw: str) -> dict | None:
try:
event = json.loads(raw)
required = {"order_id", "customer_id", "event_type", "event_ts", "amount_cents"}
if not required.issubset(event.keys()):
return None
event["event_ts"] = datetime.fromisoformat(event["event_ts"])
return event
except (json.JSONDecodeError, ValueError):
return None
def main():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(30_000, CheckpointingMode.EXACTLY_ONCE)
env.get_checkpoint_config().set_checkpoint_storage_uri("s3://checkpoints/flink/orders/")
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("order-events")
.set_group_id("flink-order-processor")
.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
watermark_strategy = (
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10))
.with_timestamp_assigner(
lambda event, _: int(event["event_ts"].timestamp() * 1000)
if isinstance(event, dict) else 0
)
)
stream = env.from_source(kafka_source, watermark_strategy, "Kafka Order Events")
parsed = (
stream.map(parse_order_event)
.filter(lambda e: e is not None)
)
# 1-minute tumbling window: revenue aggregates per customer
revenue_by_customer = (
parsed.filter(lambda e: e["event_type"] == "order_placed")
.key_by(lambda e: e["customer_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(lambda a, b: {
"customer_id": a["customer_id"],
"order_count": a.get("order_count", 1) + 1,
"total_amount_cents": a["amount_cents"] + b["amount_cents"],
"window_end": b["event_ts"].isoformat(),
})
)
sink = (
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("order-revenue-aggregates")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
revenue_by_customer.map(json.dumps).sink_to(sink)
env.execute("Order Revenue Aggregator")
if __name__ == "__main__":
main()
6. Data Quality Framework
# data_quality/checks.py
# Composable data quality check library with severity levels
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Callable
import pandas as pd
logger = logging.getLogger(__name__)
@dataclass
class QualityCheck:
name: str
description: str
check_fn: Callable[[pd.DataFrame], bool]
severity: str = "error" # 'error' blocks pipeline; 'warning' logs only
@dataclass
class QualityReport:
table: str
passed: list[str] = field(default_factory=list)
failed_errors: list[str] = field(default_factory=list)
failed_warnings: list[str] = field(default_factory=list)
@property
def has_blocking_failures(self) -> bool:
return len(self.failed_errors) > 0
def run_quality_checks(df: pd.DataFrame, table: str, checks: list[QualityCheck]) -> QualityReport:
report = QualityReport(table=table)
for check in checks:
try:
passed = check.check_fn(df)
except Exception as exc:
passed = False
logger.error("Check %s raised: %s", check.name, exc)
if passed:
report.passed.append(check.name)
elif check.severity == "error":
report.failed_errors.append(check.name)
logger.error("FAIL [error]: %s — %s", check.name, check.description)
else:
report.failed_warnings.append(check.name)
logger.warning("FAIL [warn]: %s — %s", check.name, check.description)
return report
# Pre-built check library
def no_nulls(column: str) -> QualityCheck:
return QualityCheck(
name=f"no_nulls_{column}",
description=f"Column '{column}' must have no NULL values",
check_fn=lambda df: df[column].notna().all(),
)
def no_duplicate_pk(column: str) -> QualityCheck:
return QualityCheck(
name=f"no_duplicate_pk_{column}",
description=f"Column '{column}' must be unique",
check_fn=lambda df: not df[column].duplicated().any(),
)
def row_count_between(min_rows: int, max_rows: int) -> QualityCheck:
return QualityCheck(
name=f"row_count_{min_rows}_{max_rows}",
description=f"Row count must be between {min_rows} and {max_rows}",
check_fn=lambda df: min_rows <= len(df) <= max_rows,
)
def values_in_set(column: str, allowed: set) -> QualityCheck:
return QualityCheck(
name=f"values_in_set_{column}",
description=f"Column '{column}' must only contain: {allowed}",
check_fn=lambda df: df[column].dropna().isin(allowed).all(),
)
def freshness_within_hours(timestamp_column: str, max_hours: int) -> QualityCheck:
from datetime import datetime, timezone, timedelta
return QualityCheck(
name=f"freshness_{timestamp_column}",
description=f"Most recent '{timestamp_column}' must be within {max_hours} hours",
check_fn=lambda df: (
datetime.now(timezone.utc)
- pd.to_datetime(df[timestamp_column]).max().to_pydatetime().replace(tzinfo=timezone.utc)
) <= timedelta(hours=max_hours),
)
# Example: validate the orders mart
def validate_fct_orders(df: pd.DataFrame) -> QualityReport:
checks = [
no_nulls("order_id"),
no_nulls("customer_id"),
no_nulls("ordered_at"),
no_duplicate_pk("order_id"),
values_in_set(
"order_status",
{"pending", "confirmed", "shipped", "delivered", "cancelled", "refunded"},
),
row_count_between(min_rows=1_000, max_rows=10_000_000),
freshness_within_hours("ordered_at", max_hours=25),
QualityCheck(
name="non_negative_revenue",
description="total_usd must be >= 0",
check_fn=lambda df: (df["total_usd"] >= 0).all(),
),
]
return run_quality_checks(df, table="fct_orders", checks=checks)
7. Cost Optimization for Storage and Compute
-- Redshift: Identify queries with missing sort or distribution keys
SELECT
q.query,
ROUND(q.elapsed / 1e6, 1) AS elapsed_seconds,
q.rows,
svv.diststyle,
svv.sortkey1
FROM stl_query q
JOIN svv_table_info svv ON svv."table" = 'fact_order_lines'
WHERE q.elapsed > 60e6
ORDER BY q.elapsed DESC
LIMIT 20;
-- Tables with high unsorted percentage (candidates for VACUUM SORT)
SELECT
"schema" || '.' || "table" AS full_table_name,
pg_size_pretty(size * 1024 * 1024) AS size_on_disk,
pct_unsorted,
tbl_rows
FROM svv_table_info
WHERE pct_unsorted > 20
AND tbl_rows > 1_000_000
ORDER BY pct_unsorted DESC;
# cost_optimizer/s3_lifecycle.py
# Tier cold data to Glacier automatically; delete ephemeral query results
import boto3
s3 = boto3.client("s3")
s3.put_bucket_lifecycle_configuration(
Bucket="data-lake-prod",
LifecycleConfiguration={
"Rules": [
{
"ID": "archive-raw-data",
"Status": "Enabled",
"Filter": {"Prefix": "raw/"},
"Transitions": [
{"Days": 30, "StorageClass": "STANDARD_IA"},
{"Days": 90, "StorageClass": "GLACIER_IR"},
{"Days": 365, "StorageClass": "DEEP_ARCHIVE"},
],
},
{
"ID": "delete-athena-results",
"Status": "Enabled",
"Filter": {"Prefix": "athena-results/"},
"Expiration": {"Days": 7},
},
]
},
)
Few-Shot Examples
Example 1: Debugging a Slow dbt Model
Prompt: "Our `fct_daily_revenue` dbt model takes 45 minutes. The warehouse team says it should run in under 5."
Response:
45 minutes for a revenue fact model points to three likely causes: full table scan on a large fact, a missing distribution key causing data movement between nodes, or an unbounded cross-join somewhere in the SQL.
Step 1 — Profile in Redshift:
SELECT step, label, is_diskbased, rows, bytes,
ROUND(maxtime / 1e6, 2) AS max_seconds
FROM svl_query_summary
WHERE query = (
SELECT query FROM stl_query
WHERE label LIKE '%fct_daily_revenue%'
ORDER BY starttime DESC LIMIT 1
)
ORDER BY maxtime DESC;
Step 2 — Check distribution style:
SELECT "table", diststyle, distkey, sortkey1
FROM svv_table_info
WHERE "table" IN ('fct_daily_revenue', 'stg_orders', 'dim_customers');
If `fct_daily_revenue` is `DISTSTYLE EVEN` and joins `stg_orders` on `customer_id`, every node ships rows to every other node. Fix by distributing both tables on `customer_id`:
{{ config(materialized='table', dist='customer_id', sort=['order_date', 'customer_id']) }}
This alone typically cuts runtime from 45 minutes to under 3.
Example 2: Incremental Pipeline Design
Prompt: "We have 500 million rows in our events table. How do we run daily dbt without processing all 500M rows every time?"
Response:
Use dbt's incremental materialization with a watermark filter. The model only processes rows updated since the last run:
{{ config(materialized='incremental', unique_key='event_id', on_schema_change='append_new_columns') }}
SELECT event_id, user_id, event_type, properties, occurred_at
FROM {{ ref('stg_events') }}
{% if is_incremental() %}
WHERE occurred_at > (SELECT COALESCE(MAX(occurred_at), '2020-01-01') FROM {{ this }})
{% endif %}
Three details matter: `unique_key` must match the grain of your model, `on_schema_change='append_new_columns'` prevents failures when upstream adds columns, and the watermark column must be indexed in the source.
For late-arriving data, extend the lookback by 2 hours:
WHERE occurred_at > (SELECT MAX(occurred_at) FROM {{ this }}) - INTERVAL '2 hours'
Example 3: Kafka Topic Sizing
Prompt: "We're adding real-time order events. How many partitions should our Kafka topic have?"
Response:
Partition count drives parallelism — you can never have more parallel consumers than partitions. At 5,000 events/second peak with average event size 2 KB: peak throughput = 10 MB/s. At 1 MB/s safe per partition, you need at least 10 partitions. Start at 20 to leave growth headroom; Kafka partitions cannot be decreased without recreating the topic.
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic order-events \
--partitions 20 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config compression.type=lz4 \
--config min.insync.replicas=2
Use `order_id` as the message key so all events for a single order land on the same partition and arrive in order. Avoid `customer_id` as a key if customers vary widely in order volume — that creates hot partitions that nullify your parallelism.