ML Data Pipelines
An ML data pipeline is not a generic data pipeline that happens to produce features — it has specific requirements that general-purpose ETL tools don't address: point-in-time correctness for training, sub-10ms feature lookup for serving, deterministic preprocessing for reproducibility, and schema validation that catches model-breaking changes before they reach production.
How It Works
ML data pipeline — click a stage
Each stage is independently scalable — the validation step can scale horizontally without affecting feature transform throughput.
Click each stage above. Unlike a business intelligence pipeline that serves dashboards, an ML data pipeline serves two distinct consumers: the training job (which needs historical features with correct temporal joins) and the model server (which needs the latest features in under 5ms). The same feature store — described in the Feature Stores lesson — bridges both consumers.
ML data pipelines have stricter requirements than analytics pipelines because errors compound differently: a wrong dashboard number is visible and gets reported; a wrong feature silently degrades model predictions at scale, with no immediate signal beyond slowly deteriorating metrics. Every stage — validation, transformation, feature storage, schema management — must be designed to fail loudly rather than produce silently wrong outputs. This asymmetry in error visibility is why ML data pipelines had to adopt explicit validation at every stage boundary — in analytics pipelines the user is the final validator, but in ML pipelines no human inspects individual feature values, so the pipeline must be its own critic.
Stage 1 — Validation
Every ML pipeline should validate incoming data before applying any transformations. Schema failures that reach model serving silently produce wrong predictions. Validate:
Schema checks: column names, data types, nullable vs required. A new upstream schema that renames user_id to userId breaks the pipeline — the model receives NULLs for all lookups.
Value range checks: age outside (0, 150), price negative, rating outside (1, 5). Out-of-range values often indicate upstream bugs rather than legitimate edge cases.
Distribution checks: if the mean of purchase_amount this hour is 10× the historical mean, something changed upstream. Statistical checks (Kolmogorov-Smirnov test, population stability index) catch distribution shift before it reaches the model.
import great_expectations as ge
df = ge.read_csv("features.csv")
results = df.expect_column_values_to_be_between("age", 0, 150)
results &= df.expect_column_to_not_be_null("user_id")
results &= df.expect_column_values_to_match_regex("email", r"^[^@]+@[^@]+\.[^@]+$")
if not results["success"]:
raise ValueError(f"Validation failed: {results['statistics']}")Stage 2 — Feature transformation
Transformations applied to raw data to create model-ready features. These must be identical between training and serving — any divergence is training-serving skew:
Numerical: scaling (StandardScaler, MinMaxScaler), clipping outliers, log transforms for skewed distributions
Categorical: ordinal encoding, one-hot encoding, target encoding. One-hot encoding with a fixed vocabulary must use the same vocabulary at serving time (unknown categories map to a fixed "other" bucket — never error)
Temporal: extracting day-of-week, hour-of-day, days-since-last-event from timestamps
Embeddings: mapping raw strings (product IDs, user IDs) to embedding vectors via a lookup table
The critical implementation rule: fit transformers on training data, apply the fitted parameters at serving time. Never refit on serving data.
from sklearn.preprocessing import StandardScaler
import joblib
# Training: fit and transform
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train[["age", "purchase_count"]])
joblib.dump(scaler, "artifacts/scaler.pkl") # save fitted params
# Serving: load and transform (no fit)
scaler = joblib.load("artifacts/scaler.pkl")
X_serving_scaled = scaler.transform(X_serving[["age", "purchase_count"]])Design Tradeoffs
Where Your Intuition Breaks
Training-serving skew is commonly assumed to require different code between training and serving. In practice, identical code with different fitted state is just as dangerous: a StandardScaler refitted on serving data with a slightly different distribution will apply different transformations even though the class name and method calls are identical. Skew is a state problem, not a code problem. The only reliable fix is to serialize the fitted transformer state at training time and load exactly that state at serving — the .pkl file is the contract, not the import statement. Teams that audit their code for consistency and find a match are often still shipping skew through stale or mis-versioned artifact files.
Batch vs streaming feature pipelines
| Batch | Streaming | |
|---|---|---|
| Latency | Minutes to hours | Sub-second |
| Throughput | High | Medium |
| Complexity | Low | High |
| Use case | Historical aggregates, training | Real-time features, session signals |
| Tools | Spark, dbt | Flink, Kafka Streams |
Most production ML systems use batch + streaming hybrid: slow-changing features (user demographics, 30-day aggregates) are batch-computed; fast-changing features (session activity, cart state) are stream-computed and merged at serving time.
Preprocessing in the pipeline vs in the model
Two philosophies:
Preprocess in the pipeline: apply feature engineering before training. Faster training (preprocessing not repeated per epoch). Requires a separate serving pipeline that applies the same transforms. Risk: preprocessing code diverges between training and serving.
Preprocess in the model: include preprocessing layers in the model graph (TensorFlow Transform, PyTorch preprocessing layers). Training is slower, but the model is self-contained — deploying the model includes the preprocessing. Training-serving skew is impossible because the same code runs in both contexts.
For teams with strong MLOps infrastructure, pipeline-based preprocessing is standard. For smaller teams or teams prioritizing correctness, in-model preprocessing reduces operational risk.
Schema evolution
When raw data schema changes (new column added, column renamed), ML pipelines break in one of two ways:
- Hard failure: pipeline errors on missing column → alert fires, models go stale but don't silently degrade
- Silent failure: pipeline fills missing column with NULL or default → model continues running with bad data
Prefer hard failures. Explicit schema validation at pipeline entry catches schema changes before they propagate. Version the schema (like an API contract) and test pipeline compatibility against the new schema in a staging environment before deploying.
In Practice
Building a training feature pipeline with Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, datediff, coalesce, lit
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("feature_pipeline").getOrCreate()
# Load raw data
orders = spark.read.parquet("s3://bucket/raw/orders/")
users = spark.read.parquet("s3://bucket/raw/users/")
# Feature engineering
window = Window.partitionBy("user_id").orderBy("order_timestamp")
features = (
orders
.join(users, "user_id")
.withColumn("days_since_prev_order",
datediff(col("order_timestamp"), lag("order_timestamp", 1).over(window)))
.withColumn("days_since_prev_order",
coalesce(col("days_since_prev_order"), lit(999))) # NULL → first-time buyer
.groupBy("user_id")
.agg({
"order_id": "count", # total orders
"revenue": "sum", # lifetime value
"days_since_prev_order": "avg", # avg order cadence
})
.withColumnRenamed("count(order_id)", "total_orders")
.withColumnRenamed("sum(revenue)", "lifetime_value")
)
features.write.parquet("s3://bucket/features/user_features_v2/", mode="overwrite")Serving feature pipeline with FastAPI
from fastapi import FastAPI
import joblib
import numpy as np
from redis import Redis
app = FastAPI()
redis = Redis(host="feature-store", port=6379)
scaler = joblib.load("artifacts/scaler.pkl")
model = joblib.load("artifacts/model.pkl")
@app.post("/predict")
async def predict(user_id: str):
# Fetch pre-computed features from online store
raw = redis.hmget(f"user:{user_id}", "total_orders", "lifetime_value", "avg_cadence")
if None in raw:
raw = [0, 0.0, 999] # cold-start defaults
features = np.array([[float(x or 0) for x in raw]])
features_scaled = scaler.transform(features)
score = model.predict_proba(features_scaled)[0][1]
return {"user_id": user_id, "churn_probability": round(float(score), 4)}Production Patterns
Monitoring for training-serving skew
Log features at serving time and compare their distribution to the training distribution:
# At serving time, log feature values alongside predictions
prediction_log = {
"user_id": user_id,
"total_orders": features[0][0],
"lifetime_value": features[0][1],
"prediction": float(score),
"timestamp": datetime.utcnow().isoformat(),
}
kafka_producer.send("prediction_logs", prediction_log)A daily job computes the Population Stability Index (PSI) between training and serving distributions. PSI > 0.2 signals significant drift — time to retrain.
Shadow mode for pipeline changes
Before deploying a new feature pipeline version:
- Run the new pipeline in parallel ("shadow mode") — it computes features but doesn't affect serving
- Log shadow features alongside live features
- Compare distributions for 24–48 hours
- If distributions match expected changes, promote to live
Shadow mode catches bugs that only appear with real production data distributions — edge cases that test data doesn't cover.
Feature pipeline SLOs
Define service level objectives for the pipeline:
- Freshness SLO: online store features updated within 1 hour of pipeline trigger
- Latency SLO: online feature lookup p99 < 5ms
- Availability SLO: feature pipeline completes successfully > 99.5% of runs
- Quality SLO: validation pass rate > 99.9% of input records
Alert when SLOs breach. Treat the feature pipeline as production infrastructure, not a batch job.
Enjoying these notes?
Get new lessons delivered to your inbox. No spam.