Neural-Path/Notes
30 min

Data Ingestion & CDC

Before you can transform or analyze data, you have to move it from where it lives — production databases, APIs, event streams — into your analytical system. Full table copies are wasteful; incremental loads require bookmarks; Change Data Capture (CDC) reads the database write-ahead log and streams every row-level change in near-real-time without touching production query performance.

How It Works

Postgres WAL
→ Debezium reads log →
Kafka topic
→ sink →
Warehouse

Streaming changes (no source query, no downtime):

Toggle between Full Table Copy and Change Data Capture above to see how the data flow differs. CDC captures only what changed, with no load on the source database.

Every database already records every change it makes — to a write-ahead log used for crash recovery. CDC is the insight that this log, written for an entirely different purpose, is also a perfect real-time stream of every data change, available without any additional load on the database. The three ingestion strategies differ not in correctness but in whether they exploit this existing log or re-query the database directly.

Three ingestion strategies

Full table copy: read every row from the source table and overwrite the destination. Simple and correct — no risk of missing updates or deletes. But for a table with 50 million rows, this means querying 50 million rows every run. Daily full copies put sustained read pressure on your production database. It doesn't scale, and it gets worse every month as the table grows.

Incremental load: read only rows where an update timestamp is newer than the last run. Much faster — only changed rows are transferred. The problem: this pattern misses hard deletes (a deleted row has no updated timestamp to find), and it requires the source table to have a reliable updated_at column that every code path maintains. Many production schemas don't guarantee this.

Change Data Capture (CDC): instead of querying the source table at all, read the database's own write-ahead log (WAL in Postgres, binlog in MySQL). Every INSERT, UPDATE, and DELETE is recorded in the WAL for crash recovery. CDC tools (Debezium is the most widely used) tail this log and emit each change as an event — without executing any queries on the source database.

How CDC works in Postgres

Postgres uses a WAL to ensure durability: every write is first appended to the log before it's applied to the data files. If the server crashes, Postgres replays the WAL to recover. CDC tools leverage logical replication, a Postgres feature that decodes the WAL into human-readable events (INSERT/UPDATE/DELETE with before/after row values) rather than the raw binary WAL format.

To enable CDC, Postgres must be configured with wal_level = logical, which adds the decoded output to the WAL. A replication slot tells Postgres to retain WAL segments until the CDC consumer (Debezium) has read them — preventing the WAL from being cleaned up before the consumer catches up.

Debezium connects as a replication client, reads the decoded WAL, and publishes events to Kafka topics. Each event includes the operation type, the table name, the before-and-after row values, and the log sequence number (LSN) — a monotonically increasing identifier that gives every change a total order.

Logical replication had to be a separate WAL level (not just reading the existing crash-recovery WAL) because the crash-recovery WAL encodes physical page changes in a format specific to the storage engine version — it cannot be consumed by an external tool without knowing the exact on-disk format. The wal_level = logical setting adds a decoded, stable representation of each row change to the WAL output, at the cost of slightly more WAL volume. This decoupling is what makes CDC portable across Postgres versions and consumable by external tools without tight integration with storage internals.

Event-based ingestion from SaaS APIs

Not all sources have a WAL. SaaS APIs (Stripe, Salesforce, HubSpot) expose webhooks or pull APIs with pagination and cursor-based incremental fetching. Managed ingestion connectors (Fivetran, Airbyte) handle the complexity of pagination, API rate limits, schema normalization, and incremental cursor management, writing normalized tables to the warehouse on a schedule.

The tradeoff: managed connectors are easier to operate but expensive at volume and limited to what their pre-built connectors support. Custom ingestion code gives full control but requires maintaining rate limiting, retry logic, and cursor bookmarking.

Design Tradeoffs

Where Your Intuition Breaks

The incremental load pattern — "just query rows where updated_at > last_run" — looks like it should capture all changes. It misses two failure modes that matter in practice: hard deletes (a row deleted from the source has no updated_at to find) and late-arriving updates (a transaction that started before the last run but committed after it may be missed depending on how the watermark is computed). CDC's WAL-based approach avoids both because it captures the operation log, not the current state. A more subtle misconception is that CDC has near-zero database load. It does have near-zero query load, but it does increase WAL volume (logical replication outputs more data than physical replication) and replication slot retention risk: if the CDC consumer falls behind, Postgres cannot truncate the WAL and disk fills up. CDC replaces query load with a different class of operational risk that must be actively monitored.

Full copy vs incremental vs CDC

Full CopyIncremental LoadCDC
CorrectnessHandles deletesMisses hard deletesFull fidelity (all operations)
Source loadHigh (reads everything)Low (reads delta)Near-zero (reads WAL only)
LatencyHours (batch)Minutes (batch)Seconds (streaming)
ComplexityLowMediumHigh
Requires updated_atNoYesNo
DB configuration neededNoNoYes (wal_level=logical)

CDC is the right choice when: you need sub-minute data freshness, your tables are too large for full copies, or you need to capture deletes. It's overkill when: the data is small, the source is an API (not a database), or you don't own the source database configuration.

Replication lag and ordering guarantees

CDC consumers read the WAL sequentially — events within a single transaction are emitted in commit order. But if your pipeline has multiple consumer threads or partitions in Kafka, events for different rows may arrive out of order. An UPDATE to row A in transaction 1 may be processed after an UPDATE to the same row in transaction 2 if they land on different Kafka partitions.

For the warehouse merge step (applying CDC events to the destination table), you must always apply events in LSN order per primary key — never apply an older update on top of a newer one. This is why Kafka consumer groups partitioning CDC topics by primary key hash is the standard pattern.

Replication slot retention risk

Postgres retains WAL segments until every replication slot consumer has acknowledged them. If a CDC consumer goes down for an extended period, Postgres cannot clean up the WAL — disk fills up. This is one of the most common CDC operational failures: a downstream consumer outage silently causes the production database disk to fill, eventually crashing the application.

Set max_slot_wal_keep_size (Postgres 13+) to limit how much WAL a slot can retain, and monitor replication slot lag in your alerting. If a consumer falls more than a configurable amount behind, drop the slot and accept a full re-sync rather than risking disk exhaustion.

In Practice

Choosing a CDC tool

Debezium is the most mature open-source CDC tool, with connectors for Postgres, MySQL, MongoDB, and SQL Server. It runs as a Kafka Connect plugin and publishes to Kafka topics. It requires operating Kafka, which adds complexity.

Airbyte CDC mode provides managed CDC for Postgres and MySQL without requiring you to run Kafka — events are buffered internally and written directly to the destination. Simpler to operate but less flexible for stream processing use cases.

Cloud-native options: some managed warehouses (BigQuery, Snowflake) have direct CDC connectors from major databases, handling the pipeline end-to-end as a managed service. Worth evaluating before building custom infrastructure.

The merge pattern at the destination

CDC events are not simple appends. An UPDATE event means the destination table must update an existing row. A DELETE event means a row must be removed. The standard pattern is a CDC merge:

  1. Land all events into a staging table, preserving operation type and LSN
  2. Run a MERGE (or INSERT OVERWRITE for warehouses that don't support it): for each primary key, apply the latest operation in LSN order
  3. Optionally, soft-delete records rather than physically removing rows, to preserve history

Most warehouse transformations (dbt incremental models with unique_key) handle this pattern natively.

Backfill strategy

When you first enable CDC, the replication slot only captures changes from that point forward. Historical data must be backfilled separately — typically with a one-time full table copy of the current state, run before enabling CDC. The sequence: take a consistent snapshot of the source table, note the LSN at snapshot time, bulk-load the snapshot to the destination, then begin consuming CDC events starting from that LSN. This ensures no events are missed between the snapshot and the first CDC event.

Production Patterns

Debezium connector config for Postgres

Deploy Debezium as a Kafka Connect plugin. The connector config below captures all tables in the public schema, emitting Avro-encoded events to Kafka topics named {server}.{schema}.{table}.

yaml
# debezium-postgres-connector.json (POST to /connectors on Kafka Connect)
name: postgres-cdc
config:
  connector.class: io.debezium.connector.postgresql.PostgresConnector
  database.hostname: prod-db.internal
  database.port: "5432"
  database.user: debezium_replication
  database.password: ${file:/opt/kafka/secrets/db.properties:password}
  database.dbname: app_production
  database.server.name: prod
  # Use pgoutput — ships with Postgres 10+, no plugin install needed
  plugin.name: pgoutput
  publication.name: debezium_pub
  slot.name: debezium_slot
  table.include.list: public.orders,public.customers,public.payments
  # Emit before-image on UPDATE/DELETE (requires REPLICA IDENTITY FULL on table)
  # ALTER TABLE orders REPLICA IDENTITY FULL;
  heartbeat.interval.ms: "10000"
  # Prevents slot from falling too far behind during consumer outage
  slot.max.retries: "3"
  slot.retry.delay.ms: "10000"
  key.converter: io.confluent.kafka.serializers.KafkaAvroSerializer
  value.converter: io.confluent.kafka.serializers.KafkaAvroSerializer
  key.converter.schema.registry.url: http://schema-registry:8081
  value.converter.schema.registry.url: http://schema-registry:8081
  transforms: unwrap
  transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
  transforms.unwrap.drop.tombstones: "false"
  transforms.unwrap.delete.handling.mode: rewrite

The ExtractNewRecordState SMT (Single Message Transform) flattens the Debezium envelope — downstream consumers receive a flat row rather than the nested {before, after, op, ts_ms} structure. Set delete.handling.mode: rewrite to emit delete events as a row with __deleted = true rather than a Kafka tombstone, so your warehouse sink can distinguish deletes from compaction.

Applying CDC events with a warehouse merge

Land all CDC events into a staging table first. Then merge into the target, applying only the latest operation per primary key in LSN order.

sql
-- Step 1: staging table receives raw CDC events from Kafka sink connector
-- Columns: order_id, status, amount, __op (r/c/u/d), __source_ts_ms, __lsn
 
-- Step 2: merge CDC events into the target table
MERGE INTO orders AS target
USING (
  SELECT DISTINCT ON (order_id)
    order_id,
    status,
    amount,
    updated_at,
    __op,
    __lsn
  FROM cdc_staging.orders
  -- Apply the latest LSN per key; older events for the same key are discarded
  ORDER BY order_id, __lsn DESC
) AS src
ON target.order_id = src.order_id
WHEN MATCHED AND src.__op = 'd' THEN
  -- Soft delete: preserve the row for audit, mark as deleted
  UPDATE SET deleted_at = NOW(), is_deleted = true
WHEN MATCHED AND src.__op IN ('u', 'r') THEN
  UPDATE SET
    status     = src.status,
    amount     = src.amount,
    updated_at = src.updated_at
WHEN NOT MATCHED AND src.__op != 'd' THEN
  INSERT (order_id, status, amount, updated_at)
  VALUES (src.order_id, src.status, src.amount, src.updated_at);
 
-- Step 3: truncate staging after successful merge (idempotency: re-run is safe
-- only if events are re-consumed from Kafka at the same offset)
TRUNCATE cdc_staging.orders;

For Snowflake or BigQuery, which lack MERGE ... WHEN MATCHED AND condition, use a QUALIFY window to deduplicate in the staging CTE before the merge.

Monitoring replication lag

Replication slot lag is the most critical CDC health signal. Query it from Postgres directly and expose it to your monitoring stack.

python
# monitor_replication_lag.py — run as a cron job or Prometheus exporter
import psycopg2
import time
 
QUERY = """
SELECT
    slot_name,
    database,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_bytes,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)                 AS lag_bytes_raw
FROM pg_replication_slots
WHERE slot_type = 'logical';
"""
 
def check_lag(conn_str: str, alert_threshold_bytes: int = 5 * 1024**3) -> None:
    with psycopg2.connect(conn_str) as conn, conn.cursor() as cur:
        cur.execute(QUERY)
        for slot_name, database, active, lag_pretty, lag_raw in cur.fetchall():
            print(f"slot={slot_name} active={active} lag={lag_pretty}")
            if not active:
                # Inactive slot still retains WAL — highest risk
                alert(f"Replication slot {slot_name} is INACTIVE. WAL retention risk.")
            if lag_raw > alert_threshold_bytes:
                alert(f"Slot {slot_name} lag={lag_pretty} exceeds 5 GiB threshold.")
 
def alert(msg: str) -> None:
    # Wire to PagerDuty, Slack, or your alerting stack
    print(f"ALERT: {msg}")
 
if __name__ == "__main__":
    check_lag("postgresql://monitor:password@prod-db.internal/app_production")

Set a Prometheus alert at 1 GiB lag and a page at 5 GiB. If a slot is inactive for more than 10 minutes, drop it and schedule a full re-sync — the risk of disk exhaustion outweighs the cost of the backfill.

Enjoying these notes?

Get new lessons delivered to your inbox. No spam.