Neural-Path/Notes
25 min

Data Quality & Testing

Bad data is worse than no data — it produces confident wrong answers. Data quality breaks along four dimensions: freshness (is this recent?), completeness (are rows missing?), consistency (do values agree across tables?), and uniqueness (are there duplicates?). Encoding these expectations as automated tests, run as part of the pipeline, shifts quality left before bad data reaches downstream consumers.

How It Works

Four dimensions of data quality

Click each dimension above and toggle between failing and passing states. Every dimension needs its own monitoring strategy — a freshness check that passes says nothing about completeness, and vice versa.

Data quality failures cost far more to fix downstream than to prevent upstream. An analyst who builds a weekly revenue model on duplicate order_id rows generates a chart that is wrong by an unknown amount, shared with stakeholders who make decisions on it, and must be corrected retroactively — a process that takes days and damages trust. The same bug caught by a unique test on fct_orders takes five minutes to diagnose and stops the pipeline before any consumer sees the data. Quality testing is not optional overhead; it is the difference between a data warehouse and a data swamp.

The four dimensions

Freshness answers "is the data recent enough to be trusted?" For an orders table that should update every hour, max(loaded_at) > now() - interval 2 hours is a concrete, automatable assertion. Freshness violations are often the first sign of a broken ingestion job.

Completeness catches missing rows and NULL values that should never be null. NULL rates on primary or foreign key columns are almost always a pipeline bug: a join dropped rows, an upstream API returned empty payloads, or a schema change added a required field with no backfill.

Consistency checks that values agree across tables. Revenue in fct_orders should match the sum in payments.transactions. User IDs in fct_events should all exist in dim_users. Cross-table consistency failures are insidious — each table looks valid in isolation, but the join produces incorrect numbers.

Uniqueness verifies that primary keys are actually unique. A pipeline that does INSERT without deduplication will silently insert duplicate rows on retry or backfill. count(*) - count(distinct order_id) = 0 should be a mandatory test on every fact table.

The four dimensions are independently necessary because pipeline bugs manifest differently in each: a broken ingestion job causes freshness failures; a dropped JOIN causes completeness failures; a schema change in a source causes consistency failures; a missing deduplication step causes uniqueness failures. A test suite that covers only one dimension provides false confidence — all four must be monitored because a passing freshness check says nothing about whether the data is correct, only that it arrived recently.

Testing tools and frameworks

dbt tests are the standard for warehouse-native quality testing. Two types:

Generic tests are single-line assertions applied to columns: not_null, unique, accepted_values, relationships. They compile to SQL and run inside the warehouse — no extra infrastructure required:

yaml
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered', 'cancelled']
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id

Singular tests are custom SQL files in tests/ that should return zero rows when passing:

sql
-- tests/assert_revenue_matches_payments.sql
-- Fails if orders and payments revenue differ by more than $10
SELECT
    ABS(SUM(o.revenue) - SUM(p.amount)) AS delta
FROM analytics.fct_orders o
CROSS JOIN (SELECT SUM(amount) FROM payments.transactions) p(amount)
HAVING delta > 10

Great Expectations takes a Python-native approach: expectations are defined in code, run as part of a data pipeline, and produce HTML data docs showing pass/fail history. Useful when you need richer statistical expectations (distribution checks, value range assertions) or need to test data outside a SQL warehouse.

Design Tradeoffs

Where Your Intuition Breaks

"We'll add data quality tests after the pipeline is stable" is the same reasoning as "we'll add unit tests after the code is working" — by that point, tests are documenting known behavior rather than catching unknown bugs. The time when tests are most valuable is during early pipeline development, when schema assumptions are being made and JOIN logic is being written for the first time. A pipeline that has been running untested for six months has almost certainly accumulated silent quality issues that no one has noticed because no one has tested for them. Adding tests to a mature pipeline requires archaeological work to understand what the data "should" look like versus what it actually does look like — a distinction that is obvious during initial development and murky later.

Where to run tests

StageWhen it runsCatchesCost if it fails
Pre-publish (in pipeline)Before writing to productionBlocks bad data from reaching consumersPipeline fails, data is delayed
CI on PROn dbt model changeCatches test regressions in codePR blocked, fast feedback
Post-load (scheduled)After each pipeline runCatches data that passed load but degradedData is already in production
Ad-hocOn demandGood for investigationNo ongoing protection

The right approach is pre-publish tests for critical paths (block bad data before it lands) combined with scheduled monitors for ongoing health (catch regressions that pre-publish tests miss, like slow drift in NULL rates).

Test granularity and cost

Every test runs as a SQL query. A table with 500M rows running count(distinct order_id) at every pipeline run is expensive. Three strategies:

Sample-based testing: run heavy tests on a random 1% sample. Catches systematic errors, misses rare data quality issues.

Incremental testing: test only the new partition or the rows added since the last run. Catches today's problems, doesn't re-validate historical data. Sufficient for most operational quality checks.

Severity tiers: distinguish error tests (pipeline fails, data blocked) from warn tests (alert sent, pipeline continues). Uniqueness on primary keys is error. NULL rate slightly above threshold might be warn.

False positives and alert fatigue

A test that fires every third day trains teams to ignore alerts. Causes of high false-positive rate:

  • Threshold too tight (normal day-of-week variation triggers completeness checks every Monday)
  • Row count checks that don't account for business seasonality
  • Consistency checks between tables that update at different times (payments and orders loaded minutes apart will temporarily disagree)

Set thresholds with data, not intuition. Run 30 days of history through your check and calibrate the threshold at the 99th percentile of normal variation.

In Practice

Writing your first dbt test suite

A minimal test suite for a new fact table:

yaml
models:
  - name: fct_orders
    tests:
      - dbt_utils.recency:
          datepart: hour
          field: loaded_at
          interval: 3          # freshness: loaded within 3 hours
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: revenue
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 100000   # sanity check: no single order over $100k

Run with dbt test --select fct_orders. Failed tests print the generated SQL so you can debug in the warehouse directly.

Ownership and routing

A test failure without a clear owner gets ignored. The meta field in dbt lets you annotate models with ownership:

yaml
models:
  - name: fct_orders
    meta:
      owner: "#data-eng"
      slack_channel: "#data-alerts-orders"

Configure your orchestrator (Airflow, Prefect, Dagster) to post failures to the model's slack_channel with a link to the failing test and its generated SQL. The team responsible for the pipeline should be the first to know.

Testing at the source

The cheapest place to fix data quality is at ingestion — before the data is in your warehouse at all. Source freshness checks in dbt (dbt source freshness) verify that raw tables were loaded recently. Failing a source freshness check before running downstream models prevents cascading failures through your entire pipeline.

yaml
sources:
  - name: postgres_raw
    freshness:
      warn_after: {count: 1, period: hour}
      error_after: {count: 3, period: hour}
    loaded_at_field: _loaded_at
    tables:
      - name: orders
      - name: customers

Production Patterns

dbt singular test for referential integrity

Generic relationships tests confirm that every foreign key value exists in the parent table, but they scan the full column on every run — expensive at scale. For targeted cross-domain consistency checks, write a singular test that returns zero rows when passing. Place it in tests/ and dbt picks it up automatically.

sql
-- tests/assert_fct_orders_customers_exist.sql
-- Every order must have a matching customer in dim_customers.
-- Returns rows that violate the constraint; test passes when result is empty.
SELECT
    o.order_id,
    o.customer_id
FROM {{ ref('fct_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
  AND o.created_at >= CURRENT_DATE - INTERVAL '7 days'  -- incremental: test recent data only
sql
-- tests/assert_revenue_consistency.sql
-- Revenue in fct_orders must match payments.transactions within $10.
-- Catches ETL bugs where one side updated but the other didn't.
WITH orders_rev AS (
    SELECT SUM(revenue) AS total FROM {{ ref('fct_orders') }}
    WHERE DATE(created_at) = CURRENT_DATE - INTERVAL '1 day'
),
payments_rev AS (
    SELECT SUM(amount) AS total FROM {{ source('payments', 'transactions') }}
    WHERE DATE(settled_at) = CURRENT_DATE - INTERVAL '1 day'
)
SELECT
    o.total AS orders_total,
    p.total AS payments_total,
    ABS(o.total - p.total) AS delta
FROM orders_rev o, payments_rev p
WHERE ABS(o.total - p.total) > 10

Scope singular tests to a recent time window (>= CURRENT_DATE - INTERVAL '7 days') to keep runtime sub-second on large tables. Only use full-table scans for tests that run weekly or on an explicit backfill schedule.

Great Expectations expectation suite configuration

Define a suite programmatically so it lives in version control, not in the GX UI. Attach it to a checkpoint that runs after each pipeline load.

python
import great_expectations as gx
 
context = gx.get_context()
 
# Connect to your warehouse (Snowflake, BigQuery, etc.)
datasource = context.sources.add_snowflake(
    name="warehouse",
    account="acme.us-east-1",
    user="svc_gx",
    password="${SNOWFLAKE_PASSWORD}",
    database="ANALYTICS",
    schema="PUBLIC",
    warehouse="COMPUTE_WH",
)
 
asset = datasource.add_table_asset(name="fct_orders", table_name="fct_orders")
batch_request = asset.build_batch_request()
 
suite = context.add_expectation_suite("fct_orders.core")
validator = context.get_validator(batch_request=batch_request, expectation_suite=suite)
 
# Freshness: max loaded_at within 3 hours of now
validator.expect_column_max_to_be_between(
    column="loaded_at",
    min_value={"$PARAMETER": "now() - interval '3 hours'"},  # GX dynamic parameter
)
 
# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
 
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
 
# Value ranges — revenue between $0 and $100,000
validator.expect_column_values_to_be_between("revenue", min_value=0, max_value=100_000)
 
# Categorical set
validator.expect_column_values_to_be_in_set(
    "status", {"pending", "shipped", "delivered", "cancelled"}
)
 
validator.save_expectation_suite(discard_failed_expectations=False)
 
# Create a checkpoint that runs the suite and sends Slack alerts on failure
checkpoint = context.add_checkpoint(
    name="fct_orders_post_load",
    validations=[{"batch_request": batch_request, "expectation_suite_name": "fct_orders.core"}],
    action_list=[
        {"name": "store_validation_result", "action": {"class_name": "StoreValidationResultAction"}},
        {
            "name": "send_slack_notification_on_failure",
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "${SLACK_WEBHOOK_DATA_ALERTS}",
                "notify_on": "failure",
            },
        },
    ],
)

Run the checkpoint from your orchestrator after each load:

python
result = context.run_checkpoint("fct_orders_post_load")
if not result.success:
    raise ValueError("Data quality check failed — pipeline halted before downstream models run")

Integrating quality failures into the pipeline

Wire dbt test results and GX checkpoints into your orchestrator so failures block downstream tasks and route alerts to the right channel. Below is an Airflow pattern — the same structure applies in Prefect and Dagster.

python
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import subprocess, json
 
def run_dbt_tests_and_raise(**context):
    result = subprocess.run(
        ["dbt", "test", "--select", "fct_orders", "--output", "json"],
        capture_output=True, text=True,
    )
    # dbt exits non-zero on test failure — let Airflow handle retry/alert
    if result.returncode != 0:
        # Parse structured output to surface failing test names in the alert
        for line in result.stdout.splitlines():
            try:
                record = json.loads(line)
                if record.get("status") == "fail":
                    print(f"FAILED: {record['unique_id']}{record.get('message')}")
            except json.JSONDecodeError:
                pass
        raise ValueError(f"dbt tests failed:\n{result.stdout[-2000:]}")
 
with DAG("fct_orders_daily", ...) as dag:
    ingest  = PythonOperator(task_id="ingest_orders",    python_callable=run_ingestion)
    dbt_run = BashOperator(task_id="dbt_run_fct_orders", bash_command="dbt run --select fct_orders")
    dbt_test = PythonOperator(task_id="dbt_test_fct_orders", python_callable=run_dbt_tests_and_raise)
    publish  = PythonOperator(task_id="publish_to_bi",   python_callable=refresh_bi_cache)
 
    ingest >> dbt_run >> dbt_test >> publish
    # publish only runs after dbt_test passes — bad data never reaches BI consumers

The key pattern: test tasks sit between dbt run and any downstream publish or notification step. A failed test blocks publish without any extra conditional logic.

Enjoying these notes?

Get new lessons delivered to your inbox. No spam.