Pipeline Orchestration
A pipeline that runs once is a script. A pipeline that runs reliably on a schedule, retries on failure, respects dependencies, monitors SLAs, and gives you visibility when something breaks is an orchestrated workflow. Airflow, Prefect, and Dagster all model pipelines as DAGs — but they differ sharply in how they handle state, dynamic tasks, and the developer experience.
How It Works
Pipeline DAG — failure propagation
When a task fails, all downstream tasks are skipped — they're marked as "upstream_failed" rather than running with incomplete data.
Click through the failure scenarios above. When ingest_orders fails, everything downstream of it (stg_orders, fct_orders, notify_slack) is skipped — they enter "upstream_failed" state and do not run. ingest_users and stg_users are unaffected because they have no dependency on the failed task. This is the core value of DAG-based orchestration: partial pipeline success is handled cleanly, and the scope of impact is always visible from the graph.
A data pipeline without orchestration is a collection of scripts you run manually, in the right order, and hope nothing fails. Orchestration replaces that hope with a system: the DAG structure makes dependency order explicit and machine-enforceable, retries handle transient failures without human intervention, and the run history gives you a full audit trail of what ran when and what failed. The complexity of running a full orchestration stack (scheduler, workers, metadata DB) is justified by the reliability it provides.
Directed Acyclic Graphs
Every orchestration tool models pipelines as Directed Acyclic Graphs (DAGs). Nodes are tasks; edges define dependencies. "Acyclic" means there are no cycles — a task cannot (directly or indirectly) depend on itself. The DAG defines two things: execution order (topological sort of the graph) and failure propagation (a failed node blocks all downstream nodes from running).
Airflow
Apache Airflow is the most widely deployed orchestration tool. Pipelines are defined in Python files — a DAG is a Python object, tasks are operators (PythonOperator, BashOperator, SnowflakeOperator). Airflow's scheduler polls the DAG store, triggers runs at configured intervals, and maintains run history in a metadata database (Postgres or MySQL).
Airflow's strength is its ecosystem: hundreds of operator integrations, a large community, and mature features (SLA monitoring, task-level retries, backfill, branching). Its weakness is its architecture: the scheduler is a single process that reads and parses every DAG file on a fixed interval. Large DAG files or slow Python imports cause scheduler lag. Dynamic task generation (creating tasks based on runtime data) was historically awkward, though dynamic task mapping (introduced in Airflow 2.3) addresses this significantly.
Key concepts: schedule_interval controls when runs trigger; execution_date is the logical date the run represents (not when it runs); catchup=True causes Airflow to run missed intervals when a DAG is unpaused or the start date is in the past — a common footgun that triggers hundreds of backfill runs on first deploy.
Airflow's execution_date naming is intentionally not "run time" — it is the logical partition of data the run processes. A daily pipeline running on March 16 to process March 15 data has execution_date = 2024-03-15. This distinction had to be made explicit because idempotent pipelines need to know which data partition they own (so they can overwrite exactly that partition on retry), not just when they happened to run. The indirection enables safe backfill: re-running a past execution_date overwrites the same partition with the same data, never creating duplicates.
Prefect and Dagster
Prefect takes a Python-native approach: any Python function decorated with @flow and @task becomes an orchestrated pipeline. There is no separate DAG definition — dependencies are implicit from function calls. Prefect handles retries, logging, and state tracking automatically. Its hybrid execution model lets you run flows on your own infrastructure while using Prefect's cloud for orchestration metadata. Prefect is faster to get started with but has a less mature ecosystem than Airflow.
Dagster introduces software-defined assets: instead of modeling pipelines as a series of tasks that run, you model data assets (tables, files, ML models) and the computations that produce them. Dagster tracks asset materialization and lets you understand the data, not just the jobs. When a source asset changes, Dagster can identify which downstream assets are stale and need re-materialization. This asset-centric model aligns better with how data teams think (in terms of tables, not jobs) and integrates natively with dbt and Fivetran.
Design Tradeoffs
Where Your Intuition Breaks
DAG-based orchestration looks like it provides visibility into pipeline health — you can see which tasks ran and which failed. What it doesn't show is data quality: a task that completed successfully may have written wrong or empty data. "Green in the orchestrator" does not mean "data is correct." The most damaging data incidents typically involve pipelines that run to completion every day while silently producing incorrect results — wrong joins, missing partition filters, late-arriving data that was silently dropped. Orchestration handles execution reliability; data quality testing handles correctness. The two systems are complementary, and teams that treat a successful pipeline run as a quality signal are trading on false confidence.
Choosing an orchestrator
| Airflow | Prefect | Dagster | |
|---|---|---|---|
| Paradigm | Task-based DAGs | Python functions | Asset-based |
| Dynamic tasks | Airflow 2.3+ (maps) | Native | Native |
| Ecosystem | Largest | Growing | Good (dbt-native) |
| Operational complexity | High (scheduler, workers, DB) | Lower (managed cloud option) | Medium |
| Best for | Large teams with complex pipelines | Fast iteration, Python-native teams | dbt-heavy, asset-centric workflows |
Retry strategies
Most pipeline failures are transient — a network timeout, a warehouse slot contention, an API rate limit. Configure retries on tasks that interact with external systems: retries=3, retry_delay=timedelta(minutes=5). Don't retry tasks that failed due to data errors (null primary key, schema mismatch) — retrying won't fix a data quality problem and will just delay alerting.
Exponential backoff with jitter is standard for external API calls: first retry after 1 minute, second after 2–4 minutes, third after 4–8 minutes. This prevents thundering herd when a downstream system recovers: dozens of pipelines retrying simultaneously can overwhelm a recovering service.
Idempotency
Every task should be idempotent: running it twice produces the same result as running it once. This is what makes retries safe. For SQL-based tasks, INSERT OVERWRITE (partition-level) and MERGE are idempotent. Plain INSERT without deduplication is not — a retry inserts duplicate rows. Design tasks to be safe to re-run without side effects before deploying them.
Backfill strategy
When a pipeline has been broken for several days and you fix it, you want to re-run it for the missed intervals. All orchestrators support backfill: trigger runs for past execution_date values. The critical decision is whether to run backfill runs in parallel or sequentially. Parallel backfill is faster but can overwhelm source systems and warehouse compute. Sequential backfill is slower but predictable. For most pipelines, sequential backfill with a concurrency limit of 2–4 runs at a time is the right default.
In Practice
SLA monitoring and alerting
Configure SLA callbacks that fire when a pipeline fails to complete by a deadline. In Airflow: SLA=timedelta(hours=2) on a task means Airflow sends an alert if that task hasn't completed 2 hours after the run start. Combine this with on-failure callbacks that post to a Slack channel or PagerDuty.
The alert should include: which pipeline, which task, which execution date, and a link to the run in the orchestration UI. "Pipeline X failed" is not an actionable alert. "fct_orders: task stg_orders failed at 02:14 for execution_date 2024-03-15, retried 3 times — logs at [link]" is.
Sensor tasks
A sensor waits for an external condition before proceeding: a file to appear in S3, a Kafka topic to reach a certain offset, or an upstream pipeline to complete. Sensors allow time-based orchestration to be replaced with event-driven dependencies. Instead of scheduling the transformation pipeline at 04:00 and hoping the ingestion finishes in time, a sensor waits for the ingestion to complete — eliminating timing races regardless of when ingestion finishes.
The metadata database is not your logging system
Airflow and Dagster store run metadata in a Postgres database. This database is not designed to handle terabytes of task logs. Configure external log storage (S3, GCS) for task output logs — store only metadata (task state, timestamps, retry counts) in the metadata database. Airflow's default log storage fills local disk quickly at scale.
Production Patterns
Airflow DAG with retries and SLA callback
Define SLA misses and failure callbacks at the DAG level so every task inherits them. Avoid setting defaults in individual operators — a missed operator-level override silently skips alerting.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import requests
SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../..."
def alert_slack(context):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
exec_dt = context["execution_date"].isoformat()
log_url = context["task_instance"].log_url
requests.post(SLACK_WEBHOOK, json={
"text": (
f":red_circle: *{dag_id}* | task `{task_id}` failed\n"
f"execution_date: `{exec_dt}`\n"
f"<{log_url}|View logs>"
)
})
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
requests.post(SLACK_WEBHOOK, json={
"text": f":warning: SLA missed on *{dag.dag_id}*: tasks {[s.task_id for s in slas]}"
})
default_args = {
"owner": "data-eng",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"on_failure_callback": alert_slack,
}
with DAG(
dag_id="fct_orders_daily",
default_args=default_args,
schedule_interval="0 3 * * *", # 03:00 UTC daily
start_date=days_ago(1),
catchup=False, # never run missed intervals automatically
sla_miss_callback=sla_miss_callback,
) as dag:
ingest = PythonOperator(
task_id="ingest_orders",
python_callable=run_ingestion,
sla=timedelta(hours=1),
)
transform = PythonOperator(
task_id="transform_orders",
python_callable=run_dbt_model,
sla=timedelta(hours=2),
)
ingest >> transformSet catchup=False on every DAG unless you have an explicit backfill workflow — it is the most common cause of a flood of unexpected backfill runs on first deploy.
Dagster asset with partition and failure sensor
Dagster's asset model maps directly to warehouse tables. Partition assets by date so incremental runs only materialize the new slice, and attach a run_failure_sensor to route alerts.
from dagster import (
asset, DailyPartitionsDefinition, RunFailureSensorContext,
run_failure_sensor, define_asset_job, AssetSelection,
)
import requests
SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../..."
daily = DailyPartitionsDefinition(start_date="2024-01-01")
@asset(
partitions_def=daily,
group_name="orders",
metadata={"owner": "#data-eng", "table": "analytics.fct_orders"},
)
def fct_orders(context, stg_orders):
partition_date = context.partition_key # e.g. "2024-03-15"
context.log.info(f"Materializing partition {partition_date}")
run_dbt_model("fct_orders", vars={"run_date": partition_date})
orders_job = define_asset_job(
"orders_daily_job",
selection=AssetSelection.groups("orders"),
partitions_def=daily,
)
@run_failure_sensor(monitored_jobs=[orders_job])
def orders_failure_alert(context: RunFailureSensorContext):
run_id = context.dagster_run.run_id
error_msg = context.failure_event.message
requests.post(SLACK_WEBHOOK, json={
"text": (
f":red_circle: *orders_daily_job* failed\n"
f"run_id: `{run_id}`\n"
f"error: {error_msg[:300]}"
)
})Partition assets from day one. Retrofitting partitions onto an existing unpartitioned asset requires deleting and re-materializing the full history — plan for it upfront.
Alerting on pipeline failure to Slack via Prefect
Prefect uses flow-level on_failure hooks. Pass the hook at flow definition time so it applies to every run, including scheduled and manually triggered ones.
from prefect import flow, task, get_run_logger
from prefect.context import get_run_context
import requests
SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../..."
def slack_on_failure(flow, flow_run, state):
ctx = get_run_context()
requests.post(SLACK_WEBHOOK, json={
"text": (
f":red_circle: Flow *{flow.name}* failed\n"
f"flow_run_id: `{flow_run.id}`\n"
f"state: {state.message}\n"
f"UI: https://app.prefect.cloud/flow-runs/{flow_run.id}"
)
})
@flow(
name="fct-orders-daily",
on_failure=[slack_on_failure],
retries=2,
retry_delay_seconds=300,
)
def orders_pipeline(run_date: str):
logger = get_run_logger()
logger.info(f"Running for {run_date}")
ingest_orders(run_date)
run_dbt_model("fct_orders", run_date)Prefect's retries parameter applies at the flow level. For task-level retry isolation (so one flaky task doesn't retry the entire flow), decorate individual tasks with @task(retries=3) instead.
Enjoying these notes?
Get new lessons delivered to your inbox. No spam.