Building Production-Ready Data Cleaning Pipelines in Python

Build robust, testable data cleaning pipelines in Python using pandas method chaining, scikit-learn transformers, and Pandera schema validation. Covers missing values, duplicates, outliers, and full end-to-end testing.

Introduction: Why Most Data Cleaning Code Never Makes It to Production

Here's a scenario that'll feel painfully familiar: you crack open a Jupyter notebook, load a CSV, fire off a sequence of dropna(), fillna(), and astype() calls, get the data looking right, train a model, and call it a day. The notebook works — once. On your machine. With that specific file.

Then three months later, someone else receives a new data dump with slightly different column names, a handful of unexpected nulls in a column that was never null before, and a date field that suddenly contains the string "N/A" instead of an actual date. The notebook breaks silently. The model trains on garbage. Nobody notices for weeks.

I've seen this exact pattern play out at least a dozen times across different teams, and it never stops being frustrating. This is the gap between ad-hoc data cleaning and production-ready data cleaning, and honestly, it's enormous. Most data scientists spend 60–80% of their time wrangling data, yet the code they produce for that work is often the least structured, least tested, and least reusable code in the entire project. It lives in notebooks full of unnamed cells, hard-coded assumptions, and zero error handling. The opposite of production-ready.

So what does "production-ready" actually mean for a data cleaning pipeline? It comes down to four things. First, reproducible: given the same input, it always produces the same output, and you can trace exactly what transformations were applied. Second, testable: every cleaning function has clear inputs and outputs so you can write unit tests that catch regressions before they hit production. Third, auditable: you can log what happened at each stage — how many rows were dropped, which columns were imputed, what percentage of data failed validation — so stakeholders actually trust the output. Fourth, composable: individual cleaning steps are modular functions you can rearrange, swap, or extend without rewriting everything.

In this article, we'll build data cleaning pipelines that meet all four criteria. We'll start with the conceptual framework, move through pandas method chaining and scikit-learn pipelines, add schema validation with Pandera, tackle common data quality issues at scale, and finish with a complete end-to-end example that includes logging, testing, and error handling. By the end, you'll have a template you can adapt for your own projects.

The Five Stages of a Data Cleaning Pipeline

Before writing any code, it helps to have a mental model of what a data cleaning pipeline actually does. Every pipeline — regardless of domain or tools — moves data through five stages. Understanding these stages prevents a common mistake: jumping straight into cleaning without first understanding the data, or skipping validation entirely.

Stage 1: Discovery (Profiling)

Before you can fix data, you need to understand it. Profiling means answering questions like: What are the column types? What's the distribution of values? How many nulls exist? Are there obvious outliers?

In pandas, you'd typically start with df.info(), df.describe(), and df.value_counts(). For a more comprehensive picture, the ydata-profiling library (formerly pandas-profiling) generates an interactive HTML report with correlations, missing value patterns, and distribution plots — all from a single line of code.

Stage 2: Structuring

Raw data rarely arrives in the shape you need. Structuring involves renaming columns to follow a consistent convention, reshaping data (pivoting, melting, exploding nested structures), coercing types (converting strings to dates, objects to categoricals), and dropping columns that are entirely irrelevant.

Stage 3: Cleaning

This is the stage most people think of when they hear "data cleaning." It covers handling missing values (imputation, deletion, flagging), removing or merging duplicates, addressing outliers, and fixing inconsistent values — like "USA", "United States", and "US" all referring to the same entity.

Stage 4: Enrichment

Once the data's clean, you often need to derive new information from it. This includes creating calculated columns (e.g., revenue = price × quantity), extracting features from existing columns (e.g., day-of-week from a timestamp), and joining with external reference data like lookup tables for country codes.

Stage 5: Validation

The final stage verifies that the output meets your quality contracts — the explicit expectations you have about the cleaned data. Column types should be correct. Required columns should have no nulls. Values should fall within expected ranges. Row counts should be plausible. If validation fails, the pipeline should raise an error, not silently produce bad data.

The Pipeline Flow

┌─────────────┐    ┌──────────────┐    ┌──────────┐    ┌────────────┐    ┌────────────┐
│  Discovery   │───>│  Structuring  │───>│ Cleaning  │───>│ Enrichment  │───>│ Validation  │
│ (Profiling)  │    │  (Reshape)   │    │ (Fix)    │    │ (Derive)   │    │ (Verify)   │
└─────────────┘    └──────────────┘    └──────────┘    └────────────┘    └────────────┘
     │                   │                  │                │                  │
  df.info()         rename cols        fillna()        assign()          Pandera
  ydata-profiling   astype()           drop_duplicates  merge()          Great Expectations
                    melt/pivot         clip()           feature eng.     custom checks

Here's the key insight: validation isn't optional. It's the stage that turns a script into a pipeline. Without it, you have no way of knowing whether your cleaning actually worked, or whether the input data has drifted in a way your logic doesn't handle.

Method Chaining in Pandas: Your First Step Toward Clean Pipelines

If you've been writing pandas code for any length of time, you've probably accumulated functions that look something like this:

# The messy, sequential approach
import pandas as pd

df = pd.read_csv("transactions.csv")
df.columns = df.columns.str.lower().str.replace(" ", "_")
df = df.drop(columns=["unnamed:_0"])
df["amount"] = df["amount"].astype(float)
df["date"] = pd.to_datetime(df["date"])
df = df[df["amount"] > 0]
df = df.drop_duplicates(subset=["transaction_id"])
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df = df[df["status"] != "cancelled"]
df = df.sort_values("date")
df = df.reset_index(drop=True)

This works, but it has several problems. Every line mutates df in place, making it hard to reason about the state of the data at any given point. If you insert a line in the middle, you might break something downstream. There's no separation of concerns — structuring, cleaning, enrichment, and filtering are all interleaved. And that repeated df = df... pattern? It's a visual mess.

Method chaining lets you express the same logic as a single, readable pipeline:

import pandas as pd

def clean_column_names(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize column names to lowercase snake_case."""
    return df.rename(columns=lambda c: c.lower().strip().replace(" ", "_"))

def remove_cancelled(df: pd.DataFrame) -> pd.DataFrame:
    """Drop rows with status 'cancelled'."""
    return df.query("status != 'cancelled'")

cleaned = (
    pd.read_csv("transactions.csv")
    .pipe(clean_column_names)
    .drop(columns=["unnamed:_0"])
    .astype({"amount": float})
    .assign(
        date=lambda df: pd.to_datetime(df["date"]),
    )
    .query("amount > 0")
    .drop_duplicates(subset=["transaction_id"])
    .assign(
        year=lambda df: df["date"].dt.year,
        month=lambda df: df["date"].dt.month,
    )
    .pipe(remove_cancelled)
    .sort_values("date")
    .reset_index(drop=True)
)

Much better, right? Let's break down the three key methods that make this pattern work.

Using .pipe() for Custom Functions

The .pipe() method lets you insert any function that takes a DataFrame as its first argument and returns a DataFrame. This is how you integrate your own cleaning logic — like clean_column_names and remove_cancelled above — into the chain without breaking the flow. You can also pass additional arguments: .pipe(clip_outliers, column="amount", method="iqr").

Using .assign() for New Columns

The .assign() method creates new columns (or overwrites existing ones) and returns a new DataFrame, making it perfect for chaining. The lambda syntax lambda df: ... is essential here — it ensures the lambda receives the DataFrame as it exists at that point in the chain, not the original one from before the chain started.

Using .query() for Filtering

.query() accepts a string expression and returns matching rows. It's more readable in a chain than boolean indexing (df[df["amount"] > 0]) and avoids the awkward double-bracket syntax that breaks method chains.

Now, method chaining isn't just cosmetic. It encourages you to write small, named, testable functions (the ones you pass to .pipe()), and it makes the pipeline's stages visible in the code structure itself. That said, method chaining alone doesn't give you reusability across datasets, serialization, or integration with ML workflows. For that, we need scikit-learn pipelines.

Building Reusable Cleaning Functions with Scikit-Learn Pipelines

Most people associate scikit-learn's Pipeline and ColumnTransformer with machine learning preprocessing, but they're equally powerful for general-purpose data cleaning. The key advantage is that scikit-learn pipelines are objects — you can fit them, serialize them with joblib, version them, and apply the exact same transformations to new data. In production, where your cleaning logic needs to stay consistent between training and inference (or between batch runs), this is a game-changer.

Custom Transformers: The Building Blocks

To use scikit-learn pipelines for data cleaning, you create custom transformers by inheriting from BaseEstimator and TransformerMixin. Each transformer implements fit() (to learn parameters from data, if needed) and transform() (to apply the transformation). TransformerMixin gives you fit_transform() for free.

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin

class OutlierClipper(BaseEstimator, TransformerMixin):
    """Clip outliers using the IQR method."""

    def __init__(self, factor: float = 1.5):
        self.factor = factor

    def fit(self, X, y=None):
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        IQR = Q3 - Q1
        self.lower_ = Q1 - self.factor * IQR
        self.upper_ = Q3 + self.factor * IQR
        return self

    def transform(self, X):
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped


class ColumnRenamer(BaseEstimator, TransformerMixin):
    """Rename DataFrame columns to lowercase snake_case."""

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if isinstance(X, pd.DataFrame):
            return X.rename(
                columns=lambda c: c.lower().strip().replace(" ", "_")
            )
        return X

Building a Complete Pipeline with ColumnTransformer

ColumnTransformer lets you apply different transformations to different columns — for example, imputing and scaling numeric columns while encoding categorical ones. Here's a complete pipeline that handles a realistic mix of column types:

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import (
    StandardScaler,
    OneHotEncoder,
    FunctionTransformer,
)

# Define column groups
numeric_features = ["amount", "quantity", "unit_price"]
categorical_features = ["category", "payment_method", "region"]

# Numeric sub-pipeline: impute missing values, clip outliers, scale
numeric_pipeline = Pipeline([
    ("imputer", KNNImputer(n_neighbors=5)),
    ("clipper", OutlierClipper(factor=1.5)),
    ("scaler", StandardScaler()),
])

# Categorical sub-pipeline: impute with most frequent, then one-hot encode
categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(
        drop="first",
        sparse_output=False,
        handle_unknown="infrequent_if_exist",
    )),
])

# Combine into a single ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_pipeline, numeric_features),
        ("cat", categorical_pipeline, categorical_features),
    ],
    remainder="passthrough",  # Keep columns not listed above
    verbose_feature_names_out=False,
)

# Wrap everything in a top-level pipeline (you can add more steps)
full_pipeline = Pipeline([
    ("preprocessor", preprocessor),
])

# Usage
df = pd.read_csv("transactions.csv")
df_cleaned = full_pipeline.fit_transform(df)

A couple of important details worth noting. The set_output(transform="pandas") method, stabilized in recent scikit-learn releases, lets you configure the pipeline to return DataFrames instead of NumPy arrays — preserving column names throughout:

# Enable pandas output globally for this pipeline
full_pipeline.set_output(transform="pandas")
df_cleaned = full_pipeline.fit_transform(df)
print(type(df_cleaned))  # <class 'pandas.core.frame.DataFrame'>
print(df_cleaned.columns.tolist())  # Column names preserved!

The remainder="passthrough" parameter is essential for data cleaning (as opposed to ML preprocessing), because you typically want to keep columns like IDs, timestamps, and free-text fields that aren't being transformed but are still needed downstream.

Once your pipeline is built, you can serialize it with joblib.dump(full_pipeline, "cleaning_pipeline.joblib") and load it in production with joblib.load(). The fitted parameters — imputation values, scaler means and standard deviations, encoder categories — are all stored in the serialized object. This means your production cleaning logic is exactly the same as what you developed and tested. Zero drift between environments.

Data Validation with Pandera: Enforcing Contracts on Your DataFrames

Cleaning data is only half the battle. You also need to verify that the cleaned data actually meets your expectations. This is where Pandera comes in — a statistical data validation library for pandas (and Polars) DataFrames that lets you define schemas, formal contracts specifying what valid data looks like, and validate DataFrames against them at runtime.

Think of it as type checking for your data. Just as type hints catch bugs in your Python code, Pandera schemas catch data quality issues before they propagate through your pipeline. And trust me, once you start using it, you'll wonder how you ever shipped data without it.

DataFrameSchema: The Explicit Approach

The simplest way to use Pandera is with DataFrameSchema, where you define columns and their constraints directly:

import pandera as pa

transaction_schema = pa.DataFrameSchema(
    columns={
        "transaction_id": pa.Column(
            str,
            nullable=False,
            unique=True,
            description="Unique transaction identifier",
        ),
        "date": pa.Column(
            "datetime64[ns]",
            nullable=False,
            checks=pa.Check.greater_than("2020-01-01"),
        ),
        "amount": pa.Column(
            float,
            nullable=False,
            checks=[
                pa.Check.greater_than(0),
                pa.Check.less_than(1_000_000),
            ],
        ),
        "quantity": pa.Column(
            int,
            nullable=False,
            checks=pa.Check.in_range(1, 10_000),
        ),
        "category": pa.Column(
            str,
            nullable=False,
            checks=pa.Check.isin([
                "electronics", "clothing", "food", "home", "other"
            ]),
        ),
        "email": pa.Column(
            str,
            nullable=True,
            checks=pa.Check.str_matches(
                r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
            ),
        ),
    },
    checks=[
        # DataFrame-level check: no full-row duplicates
        pa.Check(
            lambda df: ~df.duplicated().any(),
            error="DataFrame contains duplicate rows",
        ),
    ],
    coerce=True,  # Attempt to coerce types before validation
)

# Validate a DataFrame
try:
    validated_df = transaction_schema.validate(df, lazy=True)
    print("Validation passed!")
except pa.errors.SchemaErrors as exc:
    print(f"Validation failed with {len(exc.failure_cases)} issues:")
    print(exc.failure_cases)

The lazy=True parameter is important here — it collects all validation errors instead of raising on the first one. This gives you a complete picture of data quality issues in a single pass, which is way more useful than playing whack-a-mole with errors one at a time.

DataFrameModel: The Class-Based Approach

For larger schemas, Pandera offers a class-based API using DataFrameModel that's more concise and supports inheritance. This is really handy when you have related schemas — say, a base transaction schema and a stricter validated version:

import pandera as pa
from pandera.typing import Series

class TransactionSchema(pa.DataFrameModel):
    transaction_id: Series[str] = pa.Field(nullable=False, unique=True)
    date: Series[pa.DateTime] = pa.Field(nullable=False)
    amount: Series[float] = pa.Field(gt=0, lt=1_000_000)
    quantity: Series[int] = pa.Field(ge=1, le=10_000)
    category: Series[str] = pa.Field(
        isin=["electronics", "clothing", "food", "home", "other"]
    )
    email: Series[str] = pa.Field(
        nullable=True,
        str_matches=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$",
    )

    class Config:
        coerce = True
        strict = "filter"  # Drop columns not in schema

    @pa.check("amount")
    def amount_is_reasonable(cls, series: Series[float]) -> Series[bool]:
        """Custom check: flag suspiciously round amounts."""
        return series.apply(lambda x: not (x > 10_000 and x % 1000 == 0))

Integrating Pandera Into Your Pipeline

The real power of Pandera is that you can validate at every stage of your pipeline, not just at the end. This catches issues as early as possible:

class RawInputSchema(pa.DataFrameModel):
    """Schema for data as it arrives — lenient, just checking structure."""
    transaction_id: Series[str] = pa.Field(nullable=False)
    date: Series[str] = pa.Field(nullable=False)  # Still a string at this stage
    amount: Series[object] = pa.Field(nullable=True)  # May have garbage values

class CleanedSchema(pa.DataFrameModel):
    """Schema after cleaning — strict, enforcing quality contracts."""
    transaction_id: Series[str] = pa.Field(nullable=False, unique=True)
    date: Series[pa.DateTime] = pa.Field(nullable=False)
    amount: Series[float] = pa.Field(gt=0, lt=1_000_000)

# In your pipeline:
raw_df = pd.read_csv("transactions.csv")
RawInputSchema.validate(raw_df)  # Fail fast if structure is wrong

cleaned_df = clean_data(raw_df)
CleanedSchema.validate(cleaned_df)  # Ensure cleaning worked

This layered validation approach is particularly valuable in production. When an upstream data source changes its schema — a column gets renamed, a new category appears, a previously non-null field starts containing nulls — your raw input schema catches it immediately, before your cleaning code has a chance to produce incorrect output.

Handling Common Data Quality Issues at Scale

Now that we've got the framework in place — method chaining for readable code, scikit-learn pipelines for reusability, and Pandera for validation — let's tackle the specific data quality issues you'll run into in practice.

Missing Values: Beyond Mean Imputation

Simple mean or median imputation is fine for exploratory analysis, but in production you need strategies that preserve the statistical properties of your data. Scikit-learn provides several options out of the box:

from sklearn.impute import KNNImputer, IterativeImputer
from sklearn.experimental import enable_iterative_imputer  # Required

# KNN Imputation: uses similar rows to estimate missing values
knn_imputer = KNNImputer(n_neighbors=5, weights="distance")

# MICE (Multiple Imputation by Chained Equations) via IterativeImputer
mice_imputer = IterativeImputer(
    max_iter=10,
    random_state=42,
    sample_posterior=False,
)

# Domain-specific: flag missing values instead of imputing
def flag_and_fill(df: pd.DataFrame, column: str, fill_value) -> pd.DataFrame:
    """Create a boolean flag column and fill missing values."""
    return df.assign(
        **{
            f"{column}_was_missing": df[column].isna(),
            column: df[column].fillna(fill_value),
        }
    )

So which strategy should you pick? KNN imputation works well when missingness is correlated with other features. MICE is more flexible and handles complex patterns, but it's computationally expensive (sometimes significantly so). For categorical data, consider using the mode or a dedicated "Unknown" category rather than dropping rows — losing data is almost always worse than having a slightly imperfect fill value.

Duplicates: Exact vs. Fuzzy Matching

Exact duplicates are easy: df.drop_duplicates(). Done. But real-world data often contains fuzzy duplicates — records that refer to the same entity but have slight differences like typos, formatting variations, or missing fields. The recordlinkage library provides tools for detecting these:

import recordlinkage

# Create an index of candidate record pairs
indexer = recordlinkage.Index()
indexer.sortedneighbourhood(left_on="customer_name", window=3)
candidate_pairs = indexer.index(df)

# Compare the candidates on multiple fields
compare = recordlinkage.Compare()
compare.string("customer_name", "customer_name", method="jarowinkler", threshold=0.85)
compare.exact("email", "email")
compare.string("address", "address", method="levenshtein", threshold=0.80)

features = compare.compute(candidate_pairs, df)

# Pairs with a high total similarity score are likely duplicates
potential_duplicates = features[features.sum(axis=1) >= 2.0]
print(f"Found {len(potential_duplicates)} potential duplicate pairs")

Outliers: When to Remove vs. Cap vs. Keep

Not all outliers are errors. A transaction of $50,000 might be unusual but perfectly legitimate. Your strategy should depend on the domain context:

import numpy as np
from sklearn.ensemble import IsolationForest

def handle_outliers_iqr(series: pd.Series, factor: float = 1.5) -> pd.Series:
    """Cap outliers using the IQR method."""
    Q1, Q3 = series.quantile([0.25, 0.75])
    IQR = Q3 - Q1
    lower, upper = Q1 - factor * IQR, Q3 + factor * IQR
    return series.clip(lower=lower, upper=upper)

def handle_outliers_zscore(series: pd.Series, threshold: float = 3.0) -> pd.Series:
    """Flag outliers using Z-score (does not remove them)."""
    z_scores = np.abs((series - series.mean()) / series.std())
    return z_scores < threshold  # Boolean mask

# Isolation Forest for multivariate outlier detection
iso_forest = IsolationForest(contamination=0.05, random_state=42)
outlier_labels = iso_forest.fit_predict(df[numeric_features])
df["is_outlier"] = outlier_labels == -1

A good rule of thumb: cap outliers when you need to preserve row counts (e.g., for aggregations), remove them when they represent genuine data errors (negative ages, for instance), and flag them when you want downstream consumers to decide. When in doubt, flag rather than delete — you can always filter later, but you can't un-delete data.

Date/Time Inconsistencies

Date handling is a perennial source of bugs. Mixed formats, timezone-naive vs. timezone-aware timestamps, and ambiguous dates (is 01/02/2026 January 2nd or February 1st?) can all cause silent errors that are incredibly annoying to track down:

def standardize_dates(
    df: pd.DataFrame,
    column: str,
    target_tz: str = "UTC"
) -> pd.DataFrame:
    """Parse dates from mixed formats and standardize to a target timezone."""
    parsed = pd.to_datetime(
        df[column],
        format="mixed",    # pandas 3.x: handle mixed formats
        dayfirst=False,     # Explicitly set date parsing convention
        utc=True,           # Convert everything to UTC first
    )
    if target_tz != "UTC":
        parsed = parsed.dt.tz_convert(target_tz)
    return df.assign(**{column: parsed})

# Usage
df = df.pipe(standardize_dates, column="order_date", target_tz="US/Eastern")

The format="mixed" parameter in pandas 3.x is a significant improvement over the old behavior where pd.to_datetime() would either infer a single format for the entire column or fall back to slow per-element parsing. It explicitly tells pandas to handle a column containing multiple date formats, which is exactly what you need when dealing with messy real-world data.

Putting It All Together: An End-to-End Pipeline Example

Let's build a complete, realistic pipeline that cleans a messy e-commerce transactions CSV. This example ties together everything we've covered: method chaining, custom transformers, Pandera validation, logging, and error handling.

The Dataset

Imagine a CSV with these issues (and you'll recognize every single one from real-world data):

  • Inconsistent column names with spaces and mixed casing
  • Missing values in amount, category, and email
  • Duplicate transactions
  • Dates in mixed formats
  • Outlier amounts (negative values, suspiciously large transactions)
  • Inconsistent category labels ("Electronics" vs "electronics" vs "ELECTRONICS")

The Complete Pipeline

"""
Production data cleaning pipeline for e-commerce transactions.
Usage: python clean_transactions.py --input raw.csv --output cleaned.parquet
"""
import argparse
import logging
import sys
from pathlib import Path

import numpy as np
import pandas as pd
import pandera as pa
from pandera.typing import Series

# ─── Logging Setup ──────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler("pipeline.log"),
    ],
)
logger = logging.getLogger(__name__)


# ─── Pandera Schemas ────────────────────────────────────────────────
class RawTransactionSchema(pa.DataFrameModel):
    """Minimal schema for raw input validation."""
    transaction_id: Series[str] = pa.Field(nullable=False)
    date: Series[object] = pa.Field(nullable=False)
    amount: Series[object] = pa.Field(nullable=True)

    class Config:
        coerce = True


class CleanTransactionSchema(pa.DataFrameModel):
    """Strict schema for cleaned output."""
    transaction_id: Series[str] = pa.Field(nullable=False, unique=True)
    date: Series[pa.DateTime] = pa.Field(nullable=False)
    amount: Series[float] = pa.Field(gt=0, lt=1_000_000)
    quantity: Series[int] = pa.Field(ge=1, le=10_000)
    category: Series[str] = pa.Field(
        isin=["electronics", "clothing", "food", "home", "other"]
    )
    unit_price: Series[float] = pa.Field(gt=0)
    revenue: Series[float] = pa.Field(gt=0)

    class Config:
        coerce = True


# ─── Cleaning Functions ─────────────────────────────────────────────
def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Rename columns to lowercase snake_case."""
    original_cols = df.columns.tolist()
    df = df.rename(columns=lambda c: c.lower().strip().replace(" ", "_"))
    logger.info(f"Renamed columns: {original_cols} -> {df.columns.tolist()}")
    return df


def parse_dates(df: pd.DataFrame) -> pd.DataFrame:
    """Parse dates from mixed formats, standardize to UTC."""
    df = df.assign(
        date=pd.to_datetime(df["date"], format="mixed", utc=True)
    )
    logger.info(
        f"Date range: {df['date'].min()} to {df['date'].max()}"
    )
    return df


def clean_amounts(df: pd.DataFrame) -> pd.DataFrame:
    """Coerce amount to numeric, drop negative values."""
    before_count = len(df)
    df = df.assign(amount=pd.to_numeric(df["amount"], errors="coerce"))
    df = df.query("amount > 0 or amount != amount")  # Keep NaN for now
    after_count = len(df)
    logger.info(
        f"Removed {before_count - after_count} rows with non-positive amounts"
    )
    return df


def normalize_categories(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize category labels to lowercase."""
    valid = {"electronics", "clothing", "food", "home", "other"}
    df = df.assign(category=df["category"].str.lower().str.strip())
    unknown_mask = ~df["category"].isin(valid) & df["category"].notna()
    n_unknown = unknown_mask.sum()
    if n_unknown > 0:
        logger.warning(
            f"Mapping {n_unknown} unknown categories to 'other'"
        )
        df.loc[unknown_mask, "category"] = "other"
    return df


def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """Impute missing values with appropriate strategies."""
    missing_before = df.isna().sum()
    logger.info(f"Missing values before imputation:\n{missing_before}")

    df = df.assign(
        amount=df["amount"].fillna(df["amount"].median()),
        quantity=df["quantity"].fillna(1),
        category=df["category"].fillna("other"),
        unit_price=df["unit_price"].fillna(
            df["amount"] / df["quantity"]
        ),
    )
    missing_after = df.isna().sum()
    logger.info(f"Missing values after imputation:\n{missing_after}")
    return df


def remove_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    """Remove exact duplicates on transaction_id, keeping the first."""
    before_count = len(df)
    df = df.drop_duplicates(subset=["transaction_id"], keep="first")
    after_count = len(df)
    logger.info(
        f"Removed {before_count - after_count} duplicate transactions"
    )
    return df


def clip_outliers(df: pd.DataFrame) -> pd.DataFrame:
    """Clip amount outliers using IQR method."""
    Q1 = df["amount"].quantile(0.25)
    Q3 = df["amount"].quantile(0.75)
    IQR = Q3 - Q1
    lower = max(0, Q1 - 1.5 * IQR)
    upper = Q3 + 1.5 * IQR
    clipped_count = (
        (df["amount"] < lower) | (df["amount"] > upper)
    ).sum()
    df = df.assign(amount=df["amount"].clip(lower=lower, upper=upper))
    logger.info(
        f"Clipped {clipped_count} outlier amounts to [{lower:.2f}, {upper:.2f}]"
    )
    return df


def add_derived_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Add calculated fields for downstream analysis."""
    return df.assign(
        revenue=lambda d: d["amount"] * d["quantity"],
        order_year=lambda d: d["date"].dt.year,
        order_month=lambda d: d["date"].dt.month,
        order_dayofweek=lambda d: d["date"].dt.day_name(),
    )


# ─── Main Pipeline ──────────────────────────────────────────────────
def run_pipeline(input_path: str, output_path: str) -> pd.DataFrame:
    """Execute the full cleaning pipeline with validation."""
    logger.info(f"Starting pipeline: {input_path} -> {output_path}")

    # Stage 1: Load and profile
    df = pd.read_csv(input_path)
    logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
    logger.info(f"Column types:\n{df.dtypes}")
    logger.info(f"Missing value summary:\n{df.isna().sum()}")

    # Stage 2: Validate raw input structure
    try:
        RawTransactionSchema.validate(df, lazy=True)
        logger.info("Raw input schema validation passed")
    except pa.errors.SchemaErrors as exc:
        logger.error(f"Raw input validation failed:\n{exc.failure_cases}")
        raise

    # Stages 3-5: Structure, Clean, Enrich (method-chained)
    cleaned = (
        df
        .pipe(standardize_columns)
        .pipe(parse_dates)
        .pipe(clean_amounts)
        .pipe(normalize_categories)
        .pipe(remove_duplicates)
        .pipe(handle_missing_values)
        .pipe(clip_outliers)
        .pipe(add_derived_columns)
        .reset_index(drop=True)
    )

    # Stage 6: Validate cleaned output
    try:
        validated = CleanTransactionSchema.validate(cleaned, lazy=True)
        logger.info("Cleaned output schema validation passed")
    except pa.errors.SchemaErrors as exc:
        logger.error(
            f"Output validation failed with "
            f"{len(exc.failure_cases)} issues:\n{exc.failure_cases}"
        )
        raise

    # Stage 7: Write output
    validated.to_parquet(output_path, index=False)
    logger.info(
        f"Pipeline complete. Wrote {len(validated)} rows to {output_path}"
    )
    return validated


# ─── CLI Entry Point ────────────────────────────────────────────────
if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Clean e-commerce transaction data"
    )
    parser.add_argument("--input", required=True, help="Path to raw CSV")
    parser.add_argument(
        "--output", default="cleaned_transactions.parquet",
        help="Path to output Parquet file",
    )
    args = parser.parse_args()

    try:
        result = run_pipeline(args.input, args.output)
        logger.info(f"Success: {len(result)} clean rows produced")
    except Exception as exc:
        logger.exception(f"Pipeline failed: {exc}")
        sys.exit(1)

What Makes This Production-Ready

Let's look at what separates this from a typical notebook script:

  1. Structured logging: Every stage logs what it did — how many rows were removed, what the date range looks like, how many outliers were clipped. When something goes wrong in production at 3 AM, these logs are how you figure out what happened.
  2. Schema validation at both ends: The raw input schema catches upstream changes (a renamed column, for example). The output schema catches bugs in your cleaning logic — like an imputation function that accidentally introduces NaN values.
  3. Composable functions: Each cleaning step is an independent function with a clear signature (DataFrame in, DataFrame out). You can reorder them, skip them, or add new ones without touching the others.
  4. CLI interface: The pipeline runs as a script with arguments, making it easy to invoke from Airflow, cron, or CI/CD.
  5. Error handling: Validation failures raise exceptions instead of silently producing bad data. The top-level try/except ensures failures are logged before the process exits.

Testing Your Pipeline

A production pipeline without tests is a liability. Thankfully, the composable design we've been building makes testing pretty straightforward. Here's how to approach it at three levels.

Unit Testing Individual Cleaning Functions

Each cleaning function takes a DataFrame and returns a DataFrame, making them trivial to test with pytest:

import pytest
import pandas as pd
from pipeline import normalize_categories, remove_duplicates

@pytest.fixture
def sample_transactions():
    """Create a small DataFrame for testing."""
    return pd.DataFrame({
        "transaction_id": ["T001", "T002", "T003", "T002"],
        "date": ["2025-01-15", "2025-02-20", "2025-03-10", "2025-02-20"],
        "amount": [100.0, 250.0, -5.0, 250.0],
        "quantity": [1, 2, 1, 2],
        "unit_price": [100.0, 125.0, -5.0, 125.0],
        "category": ["Electronics", "CLOTHING", "unknown_cat", "CLOTHING"],
    })


def test_normalize_categories_lowercases(sample_transactions):
    result = normalize_categories(sample_transactions)
    assert result["category"].tolist() == [
        "electronics", "clothing", "other", "clothing"
    ]


def test_remove_duplicates_keeps_first(sample_transactions):
    result = remove_duplicates(sample_transactions)
    assert len(result) == 3
    assert result["transaction_id"].tolist() == ["T001", "T002", "T003"]

Property-Based Testing with Hypothesis

Edge cases are the bane of data pipelines. Hypothesis generates random DataFrames to test properties that should always hold, regardless of input:

from hypothesis import given, settings
from hypothesis.extra.pandas import columns, data_frames, column
import hypothesis.strategies as st

@given(
    df=data_frames(
        columns=[
            column("transaction_id", dtype=str),
            column("amount", dtype=float, elements=st.floats(
                min_value=-1000, max_value=100000,
                allow_nan=True, allow_infinity=False,
            )),
        ]
    )
)
@settings(max_examples=200)
def test_clip_outliers_always_produces_bounded_values(df):
    """After clipping, all non-NaN values should be within bounds."""
    if df.empty or df["amount"].isna().all():
        return  # Skip degenerate cases
    result = clip_outliers(df)
    valid = result["amount"].dropna()
    if len(valid) > 0:
        assert valid.min() >= 0  # We clip to non-negative

Integration Testing the Full Pipeline

Integration tests verify the entire pipeline runs end-to-end and produces valid output. Use a small, known input file and check the output against your schema:

import tempfile
from pathlib import Path
from pipeline import run_pipeline

def test_full_pipeline_produces_valid_output(tmp_path):
    """The full pipeline should produce a valid Parquet file."""
    # Create a small test CSV
    test_csv = tmp_path / "test_input.csv"
    test_csv.write_text(
        "Transaction ID,Date,Amount,Quantity,Unit Price,Category,Email\n"
        "T001,2025-01-15,99.99,2,49.99,Electronics,[email protected]\n"
        "T002,2025-02-20,250.00,1,250.00,clothing,[email protected]\n"
        "T003,15/03/2025,75.50,3,25.17,Food,\n"
    )
    output_path = tmp_path / "output.parquet"

    result = run_pipeline(str(test_csv), str(output_path))

    assert len(result) == 3
    assert output_path.exists()
    assert set(result["category"].unique()).issubset(
        {"electronics", "clothing", "food", "home", "other"}
    )
    assert result["amount"].min() > 0
    assert result["revenue"].notna().all()

A solid test suite for a data pipeline should include: unit tests for each cleaning function (covering both happy paths and edge cases), property-based tests for functions that handle numeric or string transformations, integration tests with known input/output pairs, and schema validation tests that verify your Pandera schemas correctly reject bad data. Running these in CI/CD ensures that changes to your cleaning logic don't silently break data quality.

Conclusion: From Notebooks to Production

Building production-ready data cleaning pipelines isn't about using fancier tools — it's about applying software engineering principles to code that's traditionally treated as throwaway. Here are the key principles we covered:

  • Structure your pipeline in stages: discovery, structuring, cleaning, enrichment, and validation. Each stage has a clear purpose, and validation is not optional.
  • Use method chaining with .pipe(), .assign(), and .query() to write readable, composable pandas code.
  • Leverage scikit-learn pipelines and custom transformers for reusable, serializable cleaning logic that stays consistent between development and production.
  • Enforce data contracts with Pandera at both the input and output of your pipeline. Schema validation is what turns a script into a pipeline.
  • Handle common issues systematically: use appropriate imputation strategies, detect fuzzy duplicates, and make deliberate decisions about outliers.
  • Test at every level: unit tests for individual functions, property-based tests for edge cases, and integration tests for the full pipeline.

Once your pipeline is tested and validated, the next step is orchestration. Tools like Apache Airflow, Prefect, and Dagster let you schedule your pipeline on a cadence, handle retries and alerting, and track lineage across runs. Dagster in particular integrates well with the asset-based mental model described here, treating each stage's output as a versioned data asset.

For readers looking to go deeper, our article on Pandas 3.0 and its new Copy-on-Write behavior covers how the memory model changes affect pipeline design. If your datasets are outgrowing what pandas handles comfortably, check out our guide on Polars and DuckDB as alternatives — the pipeline patterns in this article translate directly to both libraries. And for the ML side of preprocessing, our scikit-learn pipelines guide goes deeper into feature engineering and model integration.

The gap between notebook code and production code is real, but you don't need to cross it in one leap. Start with method chaining. Add a Pandera schema. Write one test. Each small step makes your data cleaning code more reliable, more maintainable, and genuinely worthy of the production label.

About the Author Editorial Team

Our team of expert writers and editors.