Batch Pipelines & ELT
Once data lands in the warehouse, it needs to be transformed from raw source records into clean, analytics-ready tables. The industry has shifted from ETL (transform before loading) to ELT (load first, transform inside the warehouse) — letting SQL handle transformation at scale and tools like dbt manage the dependency graph between models.
How It Works
dbt project — raw → staging → intermediate → marts
Click any model to see what it does. Each layer only depends on the layer above it — dbt enforces this DAG structure automatically.
Click any model in the diagram above to see its responsibility. Notice how each layer only reads from the layer above it — this enforces a clean separation between raw data, light cleaning, business logic, and final presentation.
The shift from ETL to ELT was not a philosophical preference — it followed from a change in economics. When compute and storage were coupled on expensive machines, transforming data outside the warehouse before loading was cheaper. Once cloud warehouses separated compute from storage and made compute elastic, loading raw data first and transforming inside the warehouse became faster, cheaper, and more maintainable. The pipeline architecture reflects the cost structure of the infrastructure.
ETL vs ELT
The original pattern was ETL: Extract from source, Transform in a processing layer (Spark, a custom Python job), then Load into the destination. Transformation happened outside the warehouse because warehouses were expensive and compute was not elastic — you paid by the storage capacity, not the query.
Modern cloud warehouses flipped this calculus. Compute is cheap, elastic, and collocated with storage. It is faster and cheaper to load raw data into the warehouse first, then transform it with SQL running inside the warehouse, than to move data through an external transformation layer. This is ELT: Extract, Load, then Transform.
The benefits of ELT are practical:
- SQL is the native language of analysts, not Python. Transformations written in SQL can be read and modified by the whole team.
- Data lineage is explicit — each transformation is a named SQL model with documented dependencies.
- Warehouses optimize SQL execution across terabytes far better than custom Python loops.
- Raw data is always preserved — transformations are non-destructive reads that produce new tables.
The three-layer model
Raw (or source): data as-landed by the ingestion layer, untouched. Naming convention: raw.orders, raw.stripe_events. No transformations, ever. This layer is the ground truth and the recovery point if a downstream transformation has a bug.
Staging (stg_*): one model per source table, doing only light cleaning:
- Cast types (string timestamps to proper timestamps, string booleans to booleans)
- Rename columns to consistent naming conventions (camelCase → snake_case,
cust_id→customer_id) - Filter obvious garbage (test accounts, internal traffic)
- Add simple derived fields that are definitional, not business logic (
is_paid = amount > 0)
No joins, no aggregations in staging. One source table in, one staging table out.
Intermediate (int_*): joins, aggregations, and complex logic that doesn't belong in marts but is too complex to repeat in every mart model. Session reconstruction, event attribution, slowly changing dimension handling.
Marts (fct_*, dim_*): the final star schema tables consumed by BI tools, data scientists, and reverse ETL. Stable column names, documented, tested. Changes to marts are breaking changes for consumers — treat them like a public API.
The three-layer naming convention (stg_*, int_*, fct_*/dim_*) had to enforce a strict directional flow — each layer reads only from the layer above, never from a layer at the same level or below — because circular dependencies make automated execution ordering impossible. dbt detects DAG cycles at compile time precisely because ref() makes the dependency graph explicit. This also means that fixing a bug in a staging model automatically propagates to all downstream intermediate models and marts that depend on it, without manually tracing what needs to be re-run.
Incremental materialization
For large fact tables (billions of rows), rebuilding the full model on every run is too slow. dbt's incremental model pattern runs a full build on first execution, then on subsequent runs executes only the incremental logic:
-- models/fct_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}
SELECT ...
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE created_at >= (SELECT MAX(created_at) FROM {{ this }})
{% endif %}On incremental runs, dbt compares the incoming rows against existing rows using unique_key and merges: inserting new rows and updating changed ones. This reduces a 4-hour full refresh to a 2-minute incremental run.
The tradeoff: incremental models can drift. If a bug in upstream data is discovered and corrected historically, the incremental model won't backfill the fix unless you run dbt run --full-refresh. Schedule periodic full refreshes (weekly or monthly) to ensure accumulated corrections propagate.
Design Tradeoffs
Where Your Intuition Breaks
Incremental dbt models look like a performance optimization — they are also a correctness risk. The incremental pattern appends or merges only new rows, which means any bug in historical data that gets corrected at the source is silently ignored: the incremental model already processed those rows and will not reprocess them unless forced. This is not obvious from reading the model code, which looks identical to a non-incremental model. The correct mental model is that an incremental model is a persistent accumulation of past runs, not a recomputation from source — it is stateful, and stateful systems require explicit mechanisms (full refreshes, lookback windows, partial replays) to handle source corrections. Many data quality incidents trace back to an incremental model that drifted from source truth over weeks or months because no periodic full refresh was scheduled.
Table vs view materialization
- View: no storage cost, always reads latest data, but slow for complex models that are queried frequently — the full SQL runs on every query against the view.
- Table: pre-computed result stored in the warehouse, fast to query, but stale between runs. Use for models that are queried many times per day or are expensive to compute.
- Incremental: best of both for large append-heavy tables — fast runs, no full recompute.
- Ephemeral: dbt inlines the model as a CTE in downstream models — useful for small intermediate logic you don't need to materialize.
A common pattern: staging models as views (cheap, always fresh), intermediate models as tables (expensive joins worth caching), marts as tables (queried constantly by dashboards).
Testing strategy
dbt has two test types. Schema tests are generic assertions on column properties:
not_null: every row has a valueunique: no duplicate valuesaccepted_values: only known enum values appearrelationships: foreign key integrity between models
Data tests (singular tests) are custom SQL queries that should return zero rows if the data is correct:
-- tests/assert_revenue_positive.sql
SELECT order_id FROM {{ ref('fct_orders') }} WHERE revenue < 0Run tests on every pipeline execution. A failing test on fct_orders.unique(order_id) tells you a deduplication logic bug exists before analysts report the wrong numbers. Tests are the contract that makes transformations trustworthy.
Pipeline scheduling
Most batch pipelines run on a fixed schedule: hourly for operational dashboards, daily for analytical reports. The scheduler (Airflow, Prefect, dbt Cloud) handles dependency ordering — run ingestion first, then staging, then intermediate, then marts, then tests, then notify consumers.
The failure handling decision matters: should a failed staging model block all downstream models from running? In dbt, yes by default. This is correct — propagating bad data downstream is worse than no data. Configure alerting on failures and design models to be idempotent: running the same model twice produces the same result as running it once.
In Practice
Incremental model pitfalls
Late-arriving data: if source systems backfill historical records (a common occurrence with event data), an incremental model with WHERE created_at >= ... will miss them. Add a lookback buffer: WHERE created_at >= (SELECT MAX(created_at) - INTERVAL '3 days' FROM {{ this }}) to reprocess the last 3 days on every run.
The full-refresh failure: incremental models will drift over months without occasional full refreshes. Add full-refresh: cron: 0 2 * * 0 (weekly, Sunday 2am) to your schedule.
Schema changes in the source: if a source table adds a column, the incremental model doesn't pick it up until a full refresh. Monitor source schema changes and trigger full refreshes proactively.
What good dbt project hygiene looks like
- Every model has a description in
schema.yml - Every mart model has at least
not_nullanduniquetests on its primary key - Staging models are in a
staging/subdirectory, marts inmarts/, intermediate inintermediate/ ref()is used for all cross-model dependencies, never hardcoded table names- Source freshness tests run on raw tables before models execute, so a stuck ingestion pipeline surfaces before transformations produce stale data rather than after analysts report stale dashboards
Production Patterns
dbt incremental model with late-arriving data handling
The standard incremental pattern misses records that arrive late or are backfilled by upstream systems. Add a lookback window and explicit deduplication.
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
on_schema_change='sync_all_columns'
)
}}
WITH source AS (
SELECT
order_id,
customer_id,
status,
amount_cents,
created_at,
updated_at,
-- Deduplicate within the source window: keep the most recently updated record
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) AS rn
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
-- 3-day lookback catches late-arriving records and upstream backfills
WHERE updated_at >= (
SELECT DATEADD(day, -3, MAX(updated_at)) FROM {{ this }}
)
{% endif %}
)
SELECT
order_id,
customer_id,
status,
amount_cents,
created_at,
updated_at
FROM source
WHERE rn = 1The on_schema_change='sync_all_columns' setting automatically adds new source columns to the target table on the next run, rather than silently dropping them. Pair this with a weekly full-refresh job in dbt Cloud to prevent long-term drift.
Configuring Spark for large batch jobs
For batch transformations that outgrow the warehouse (very wide joins, custom Python UDFs, ML feature generation), Spark on EMR or Databricks is the standard fallback. Two settings cause most production failures at scale: executor memory and shuffle partitions.
# spark_batch_config.py
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("fct-orders-daily-batch")
# Executor memory: aim for 4–8 GiB per core; OOM errors mean this is too low
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "4")
# Driver memory: increase if you collect() large DataFrames or use broadcast joins
.config("spark.driver.memory", "4g")
# Shuffle partitions: default is 200, which causes tiny partitions on small datasets
# and huge partitions on large ones. Target ~128 MiB per partition post-shuffle.
# Rule of thumb: (total_data_GB * 1024) / 128 = target partition count
.config("spark.sql.shuffle.partitions", "800")
# Adaptive Query Execution: Spark 3.0+ can coalesce shuffle partitions at runtime
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Broadcast join threshold: tables smaller than this are broadcast to all executors
# Avoids expensive shuffle joins for small dimension tables
.config("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024)) # 50 MiB
.getOrCreate()
)
# Cache the large fact table if it's joined multiple times in the same job
orders_df = spark.table("raw.orders").cache()
orders_df.count() # Materialize the cache before the first joinEnable AQE (spark.sql.adaptive.enabled) in all Spark 3.x jobs — it dynamically adjusts partition counts and join strategies based on actual runtime statistics, typically reducing job time by 20–40% without manual tuning.
Testing batch pipeline idempotency
An idempotent pipeline produces the same result whether it runs once or ten times. This is non-negotiable for batch jobs that may be retried on failure. Test it explicitly.
# tests/test_fct_orders_idempotency.py
import subprocess
import pytest
from your_warehouse_client import query # replace with your DWH connector
def run_dbt_model(model: str, full_refresh: bool = False) -> None:
cmd = ["dbt", "run", "--select", model]
if full_refresh:
cmd.append("--full-refresh")
result = subprocess.run(cmd, capture_output=True, text=True)
assert result.returncode == 0, result.stderr
def get_row_count_and_checksum(table: str) -> tuple[int, str]:
row_count = query(f"SELECT COUNT(*) FROM {table}")[0][0]
# Checksum across all columns to detect value-level changes
checksum = query(f"""
SELECT MD5(STRING_AGG(CAST(order_id AS TEXT) || CAST(amount_cents AS TEXT)
ORDER BY order_id))
FROM {table}
""")[0][0]
return row_count, checksum
@pytest.mark.integration
def test_fct_orders_idempotent():
# First run: full refresh to establish baseline
run_dbt_model("fct_orders", full_refresh=True)
count_1, checksum_1 = get_row_count_and_checksum("marts.fct_orders")
# Second run: incremental (simulates a retry with no new source data)
run_dbt_model("fct_orders")
count_2, checksum_2 = get_row_count_and_checksum("marts.fct_orders")
assert count_1 == count_2, f"Row count changed: {count_1} → {count_2}"
assert checksum_1 == checksum_2, "Data changed between identical runs"Run idempotency tests in CI on every PR that touches a mart model. A model that changes output on repeated runs with unchanged inputs is a latent correctness bug waiting to become a production incident.
Enjoying these notes?
Get new lessons delivered to your inbox. No spam.