Neural-Path/Notes
25 min

Data Observability

Data quality testing catches problems before deployment; data observability catches them after. Production tables can degrade silently — rows stop arriving, distributions shift, foreign keys break — without triggering any pipeline error. Observability tooling monitors table health continuously, detects anomalies against historical baselines, and routes alerts to the right owner using the lineage graph.

How It Works

Data observability — row count monitor: fct_orders

⚠ Anomaly detected

Expected: ~50,000 rows/day (±2σ band). Detected: 12,300 rows on Mar 19 — 76% below baseline.

60k30k0
Mar 8Mar 10Mar 12Mar 14Mar 16Mar 18Mar 20
🚨 Alert: row_count anomaly
Table: analytics.fct_orders
Expected: 48,000–54,000 rows
Actual: 12,300 rows (76% drop)
Owner: #data-eng (via lineage)

The chart above shows a row count monitor for fct_orders over 14 days. The shaded band is a 2σ (two standard deviation) expected range computed from a rolling 7-day window. On March 19, row count drops from ~50,000 to 12,300 — a 76% drop that falls far outside the expected range, triggering an alert.

Data quality testing tells you the pipeline is doing what it was designed to do; data observability tells you whether the data it is producing still reflects reality. A pipeline can pass all its tests — not_null, unique, relationships all green — while silently degrading because row counts dropped 70%, a distribution shifted, or an upstream schema changed in a way that no test anticipated. Observability catches what tests cannot: the unknown unknowns.

Continuous monitoring and anomaly detection

This is the core observability pattern: learn the normal distribution of a metric, then alert when reality deviates from it. This is different from static thresholds (which need manual calibration) — dynamic baselines adapt to seasonality, traffic growth, and day-of-week patterns automatically.

What gets monitored

Volume monitors track row count per time partition. The most important monitor to have. A row count drop often indicates an upstream ingestion failure before any pipeline error is raised.

Freshness monitors track max(loaded_at) or max(updated_at). If a table that should update hourly hasn't seen a new row in 6 hours, something is wrong upstream.

Schema monitors alert when columns are added, removed, or changed type. An upstream team renaming a column without notice is one of the most common sources of silent breakage.

Distribution monitors track statistical properties of key columns — NULL rate, cardinality, min/max, mean — and alert when they shift. If country_code suddenly has 40% NULL values, a JOIN is broken somewhere. If revenue mean drops by 50%, something changed in the calculation.

Referential integrity monitors check that foreign keys in fact tables have matching entries in dimension tables. If fct_orders.customer_id has values that don't exist in dim_customers, data is arriving out of order or a DELETE wasn't propagated.

The five pillars of data observability

Data observability tools (Monte Carlo, Bigeye, Accel Data) organize coverage around five pillars:

PillarQuestionExample monitor
FreshnessIs the data recent?Last row loaded within SLA window
VolumeAre expected rows arriving?Row count within 2σ of 7-day rolling baseline
SchemaDid the structure change?Column count, types, and names unchanged
DistributionDo values look normal?NULL rate, value range, cardinality within bounds
LineageIf something breaks, what's affected?Downstream impact graph from broken table

The fifth pillar — lineage — is what distinguishes observability from simple monitoring. When a volume anomaly is detected, the tool uses lineage to identify which dashboards and consumers are affected, and which upstream tables are the likely root cause.

Dynamic baselines had to learn from historical data rather than being manually configured because table volumes and distributions change continuously — a metric that was healthy at 50,000 rows per day last quarter may be healthy at 150,000 rows per day this quarter after a marketing push. Static thresholds need constant human maintenance to stay meaningful; dynamic baselines encode the rule "alert when today's value is anomalous relative to what this metric normally looks like," which remains correct as the data grows.

Design Tradeoffs

Where Your Intuition Breaks

Observability platforms advertise "no-setup automatic monitoring" — connect your warehouse and they monitor everything. In practice, automatic coverage catches gross failures (row count drops to zero, table disappears) but generates enormous noise on everything else. Every table has natural variation; the platform doesn't know which tables matter, what constitutes a meaningful deviation, or which alerts should page someone versus just log a warning. The setup work that "automatic" monitoring avoids is the work of encoding business semantics: this table should update every hour, this column should never be NULL, this metric should not drop more than 20% week over week. Without that context, observability platforms produce hundreds of low-signal alerts per day. The discipline is not in the tooling — it is in defining what "healthy" means for each critical data asset and configuring monitors that reflect those definitions.

Observability tools vs dbt tests

dbt testsData observability platform
When it runsDuring pipeline executionContinuously, independently of pipelines
CoverageModels you write tests forAll tables in the warehouse (automatic)
BaselinesStatic thresholds you defineDynamic, learned from history
Setup effortMedium (write tests per model)Low (connect warehouse, auto-discover)
Alert routingVia orchestrator (Airflow, etc.)Native PagerDuty, Slack, Jira integrations
Root causeNoYes (lineage-based)

Use both. dbt tests catch issues before deployment (pre-publish). Observability monitors catch degradation in production that pre-publish tests can't predict (historical drift, upstream changes outside your pipeline).

Alert fatigue and noise

Too many alerts train teams to ignore them. Three causes of noisy observability alerts:

Seasonality blindness: a weekly row count check that doesn't account for lower traffic on weekends will fire every Monday and every Sunday. Dynamic baselines should be computed on day-of-week-matched history (today vs same weekday last 4 weeks), not raw rolling averages.

Threshold sensitivity: a 2σ threshold fires on ~5% of normal days by definition. Consider 3σ (0.3%) for operational tables, 2σ only for critical business metrics where you want to catch even moderate anomalies.

Transient conditions: a freshness alert that fires when a pipeline is 10 minutes late due to warehouse queue contention resolves itself before anyone investigates. Add a grace_period — only alert if the condition persists for 30 minutes after the expected update time.

Incident routing via lineage

When fct_orders shows a volume anomaly, who gets paged? Without lineage, you route alerts to the owner of the failing table. With lineage, you can:

  1. Identify all upstream tables that feed fct_orders (potential root causes)
  2. Check if those tables also show anomalies (which table broke first?)
  3. Identify all downstream consumers of fct_orders (who is impacted?)
  4. Page the owner of the root-cause table, not just the failing one

This is the difference between an alert that says "fct_orders is broken" and one that says "stg_orders row count dropped at 02:14, likely caused by ingest_orders failure. Downstream: revenue dashboard, customer health score pipeline."

In Practice

Setting up a row count monitor

For a table that should receive ~50,000 rows per day:

sql
-- Baseline query (run daily, store results)
SELECT
    DATE(loaded_at)                          AS partition_date,
    COUNT(*)                                 AS row_count,
    AVG(COUNT(*)) OVER (
        ORDER BY DATE(loaded_at)
        ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
    )                                        AS rolling_7d_avg,
    STDDEV(COUNT(*)) OVER (
        ORDER BY DATE(loaded_at)
        ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
    )                                        AS rolling_7d_std
FROM analytics.fct_orders
GROUP BY 1

Alert condition: row_count < rolling_7d_avg - 2 * rolling_7d_std.

Store these baseline stats in a monitoring table. Compare today's count against baseline after each pipeline run.

Actionable alerts

An alert that contains only "anomaly detected on fct_orders" produces a bad on-call experience. A good alert includes:

🚨 Row count anomaly: analytics.fct_orders
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Expected:  48,000–54,000 rows (7-day baseline)
Actual:    12,300 rows (76% below baseline)
Detected:  2024-03-19 03:47 UTC
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Likely root cause:
  → stg_orders also degraded (11,800 rows vs expected 50,000)
  → ingest_orders last succeeded at 2024-03-18 22:03 UTC

Downstream impact:
  → revenue_dashboard (refreshes at 06:00)
  → customer_health_pipeline (runs at 04:00)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Owner: #data-eng | Runbook: notion.so/xyz/fct-orders-runbook

Every alert should answer: what broke, when, by how much, likely cause, who is impacted, and where to find the runbook.

Runbooks reduce MTTR

Mean time to resolution drops dramatically when an alert links to a runbook. A runbook for a row count alert covers:

  1. Check ingestion job status in Airflow
  2. Check source database for missing rows (query with time filter)
  3. Check Kafka consumer lag if CDC-based
  4. Common causes: source DB maintenance window, schema change, API rate limit
  5. How to trigger a backfill once root cause is resolved
  6. Escalation path if not resolved in 2 hours

Write runbooks when you set up the monitor, not after the first incident.

Production Patterns

Custom freshness and volume monitor

Deploy a lightweight monitor that runs after each pipeline and writes results to a monitoring table:

python
# monitors/table_monitor.py
import os
import snowflake.connector
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
 
@dataclass
class MonitorResult:
    table: str
    metric: str
    value: float
    expected_min: float
    expected_max: float
    passed: bool
    checked_at: datetime
 
def get_conn():
    return snowflake.connector.connect(
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
        database="analytics",
    )
 
def check_freshness(conn, table: str, max_age_hours: float) -> MonitorResult:
    cur = conn.cursor()
    cur.execute(f"SELECT MAX(loaded_at) FROM {table}")
    latest = cur.fetchone()[0]
    age_hours = (datetime.now(timezone.utc) - latest).total_seconds() / 3600
    return MonitorResult(
        table=table, metric="freshness_hours", value=round(age_hours, 2),
        expected_min=0, expected_max=max_age_hours,
        passed=age_hours <= max_age_hours,
        checked_at=datetime.now(timezone.utc),
    )
 
def check_volume(conn, table: str, date_col: str = "loaded_at") -> MonitorResult:
    cur = conn.cursor()
    cur.execute(f"""
        WITH daily AS (
            SELECT DATE({date_col}) AS d, COUNT(*) AS n
            FROM {table}
            WHERE {date_col} >= DATEADD(day, -8, CURRENT_DATE)
            GROUP BY 1
        ),
        baseline AS (
            SELECT
                AVG(n)    AS avg_n,
                STDDEV(n) AS std_n
            FROM daily
            WHERE d < CURRENT_DATE
        )
        SELECT
            (SELECT n FROM daily WHERE d = CURRENT_DATE) AS today,
            avg_n - 2 * std_n                            AS lower_bound,
            avg_n + 2 * std_n                            AS upper_bound
        FROM baseline
    """)
    today, lower, upper = cur.fetchone()
    return MonitorResult(
        table=table, metric="row_count", value=today,
        expected_min=lower, expected_max=upper,
        passed=(lower <= today <= upper),
        checked_at=datetime.now(timezone.utc),
    )
 
def write_results(conn, results: list[MonitorResult]):
    cur = conn.cursor()
    for r in results:
        cur.execute("""
            INSERT INTO monitoring.monitor_results
                (table_name, metric, value, expected_min, expected_max, passed, checked_at)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
        """, (r.table, r.metric, r.value, r.expected_min, r.expected_max, r.passed, r.checked_at))

Run this after your dbt pipeline completes in Airflow by chaining a PythonOperator downstream of the final DbtRunOperator.

dbt source freshness checks

For sources ingested externally (Fivetran, Airbyte, custom scripts), declare freshness expectations directly in sources.yml. dbt tracks the loaded_at column and fails the check if the source has gone stale:

yaml
# models/staging/sources.yml
sources:
  - name: raw
    database: raw
    schema: public
    loader: fivetran
    loaded_at_field: _fivetran_synced   # dbt uses this column for freshness
    freshness:
      warn_after:  { count: 2,  period: hour }
      error_after: { count: 6,  period: hour }
    tables:
      - name: orders
        description: "Orders from the production Postgres replica."
        freshness:
          warn_after: { count: 1, period: hour }   # override source default
          error_after: { count: 3, period: hour }
      - name: refunds
        description: "Refund events from payments service."
        # inherits source-level freshness thresholds

Run freshness checks in CI or as a scheduled Airflow task:

bash
dbt source freshness --select source:raw

dbt exits non-zero on error_after violations, making it easy to gate downstream models on source health.

PagerDuty alert routing for data incidents

Route observability alerts to the right team using PagerDuty's Events API v2. Keep a service mapping so each table routes to the correct on-call rotation:

python
# monitors/alerting.py
import os
import requests
 
# Map tables to PagerDuty routing keys (one per team's service)
ROUTING_KEYS: dict[str, str] = {
    "analytics.fct_orders":        os.environ["PD_KEY_DATA_ENG"],
    "analytics.dim_customers":     os.environ["PD_KEY_DATA_ENG"],
    "payments.transactions":       os.environ["PD_KEY_PAYMENTS"],
    "growth.user_events":          os.environ["PD_KEY_GROWTH"],
}
FALLBACK_KEY = os.environ["PD_KEY_DATA_ENG"]
 
def fire_alert(result: "MonitorResult", downstream_impact: list[str]) -> None:
    routing_key = ROUTING_KEYS.get(result.table, FALLBACK_KEY)
    payload = {
        "routing_key": routing_key,
        "event_action": "trigger",
        "dedup_key": f"{result.table}/{result.metric}",   # suppresses duplicate pages
        "payload": {
            "summary": (
                f"{result.metric} anomaly on {result.table}: "
                f"{result.value:.0f} (expected {result.expected_min:.0f}{result.expected_max:.0f})"
            ),
            "severity": "error",
            "source": "data-observability",
            "custom_details": {
                "table": result.table,
                "metric": result.metric,
                "value": result.value,
                "expected_range": f"{result.expected_min:.0f}{result.expected_max:.0f}",
                "detected_at": result.checked_at.isoformat(),
                "downstream_impact": downstream_impact,
                "runbook": f"https://notion.so/runbooks/{result.table.replace('.', '-')}",
            },
        },
    }
    resp = requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json=payload, timeout=10,
    )
    resp.raise_for_status()

Set dedup_key to a stable identifier so a persistent anomaly fires once per incident rather than once per monitor run. Use PagerDuty's auto-resolve by sending an event_action: resolve event when the monitor passes again.

Enjoying these notes?

Get new lessons delivered to your inbox. No spam.