Pipelines sind das Herzstück professioneller Machine-Learning-Projekte mit scikit-learn. Sie verhindern Data Leakage, machen Cross-Validation überhaupt erst korrekt, packen Vorverarbeitung und Modell in einen einzigen fit()-Aufruf und liefern reproduzierbare Ergebnisse. Trotzdem sieht man in den meisten Tutorials immer noch StandardScaler.fit_transform() auf dem gesamten Datensatz – ein Klassiker, der in Produktion zu hübsch aussehenden, aber falschen Metriken führt.
Ehrlich gesagt: Ich hab diesen Fehler selbst lange genug gemacht, bis mir ein Kollege im Code-Review die Ohren langgezogen hat. Seitdem geht bei mir kein Modell mehr ohne Pipeline live.
Dieser Leitfaden ist die kompakte Praxisreferenz für 2026: aktuelle scikit-learn-API (Version 1.6+), ColumnTransformer mit gemischten Datentypen, der pandas-Output via set_output(), sauberes Hyperparameter-Tuning, Persistierung mit joblib – und natürlich die typischen Fallstricke, die in Produktionscode immer wieder auftauchen.
Warum überhaupt Pipelines?
Ein typischer Workflow ohne Pipeline sieht ungefähr so aus:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
model = LogisticRegression()
model.fit(X_train_scaled, y_train)
score = model.score(X_test_scaled, y_test)
Das funktioniert. Es ist aber ziemlich fragil. Sobald Cross-Validation, Grid Search oder mehrere Vorverarbeitungsschritte ins Spiel kommen, multiplizieren sich die manuellen fit_transform/transform-Aufrufe – und es schleicht sich fast garantiert Data Leakage ein. Meistens dadurch, dass der Scaler auf den kompletten Datensatz angepasst wird, bevor überhaupt gesplittet wird.
Die Pipeline-Variante ist nicht nur kürzer, sondern auch mathematisch korrekt:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
pipe = Pipeline([
("scaler", StandardScaler()),
("clf", LogisticRegression(max_iter=1000)),
])
pipe.fit(X_train, y_train)
print(pipe.score(X_test, y_test))
Der entscheidende Vorteil: Wenn du diese Pipeline an cross_val_score oder GridSearchCV übergibst, wird der Scaler in jedem Fold neu auf den Trainingsteil gefittet – nie auf den Validierungsteil. Genau so soll es sein.
ColumnTransformer: gemischte Datentypen sauber behandeln
Echte Datensätze enthalten so gut wie nie nur numerische Spalten. Für gemischte Daten ist ColumnTransformer das Werkzeug der Wahl. Er wendet unterschiedliche Transformationen auf unterschiedliche Spaltenmengen an und fügt das Ergebnis wieder zusammen.
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import GradientBoostingClassifier
numeric_features = ["age", "income", "tenure_months"]
categorical_features = ["country", "plan", "device"]
numeric_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
])
categorical_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])
preprocessor = ColumnTransformer([
("num", numeric_pipeline, numeric_features),
("cat", categorical_pipeline, categorical_features),
])
model = Pipeline([
("prep", preprocessor),
("clf", GradientBoostingClassifier(random_state=42)),
])
model.fit(X_train, y_train)
Drei Punkte, die in 2026 erwähnenswert sind:
handle_unknown="ignore"beimOneHotEncoderist Pflicht für Production. Tauchen unbekannte Kategorien im Test- oder Live-Datensatz auf (und das wird passieren), gibt's sonst einen lautstarkenValueError.sparse_output=Falseist seit scikit-learn 1.2 das neue Argument – das altesparsewurde entfernt. Für baumbasierte Modelle ist dichte Ausgabe meist deutlich angenehmer zu debuggen.- Spalten, die in keiner Liste auftauchen, werden standardmäßig stillschweigend verworfen. Mit
remainder="passthrough"bleiben sie erhalten – das hat mich schon mehr als einmal vor stundenlangem Suchen bewahrt.
Spalten dynamisch auswählen mit make_column_selector
Wer Spalten nicht hart kodieren möchte, nutzt make_column_selector:
from sklearn.compose import make_column_selector
preprocessor = ColumnTransformer([
("num", numeric_pipeline, make_column_selector(dtype_include="number")),
("cat", categorical_pipeline, make_column_selector(dtype_include="object")),
])
Das ist besonders praktisch, wenn neue Features dazukommen, ohne dass das Pipeline-Skript jedes Mal angefasst werden muss.
set_output: pandas-DataFrames statt NumPy-Arrays
Ein echter Klassiker beim ColumnTransformer-Output: das Ergebnis ist ein NumPy-Array ohne Spaltennamen, und Debugging wird zur Detektivarbeit. Seit scikit-learn 1.2 gibt es dafür set_output(transform="pandas"), das in 1.5/1.6 spürbar stabiler geworden ist.
preprocessor.set_output(transform="pandas")
X_train_prepared = preprocessor.fit_transform(X_train)
print(X_train_prepared.head())
print(X_train_prepared.columns.tolist())
Die Ausgabe ist jetzt ein DataFrame mit Spalten wie num__age, num__income, cat__country_DE, cat__country_FR. Die Präfixe stammen aus den Schritt-Namen im ColumnTransformer.
Global aktivieren lässt sich der pandas-Output über:
from sklearn import set_config
set_config(transform_output="pandas")
Damit liefern alle Transformer im Skript pandas-Output zurück. Praktisch für explorative Analysen. In Produktionscode würde ich es eher lokal pro Transformer setzen, einfach um klare Grenzen zu haben (und keine Überraschungen, falls jemand das globale Setting in einem anderen Modul ändert).
Hyperparameter-Tuning mit GridSearchCV
Der größte Pluspunkt von Pipelines wird sichtbar, sobald Hyperparameter-Tuning ins Spiel kommt. Du kannst Parameter aus jeder Pipeline-Stufe gleichzeitig durchsuchen – inklusive der Vorverarbeitung.
from sklearn.model_selection import GridSearchCV
param_grid = {
"prep__num__imputer__strategy": ["mean", "median"],
"clf__n_estimators": [100, 200, 400],
"clf__max_depth": [3, 5, 7],
"clf__learning_rate": [0.05, 0.1],
}
grid = GridSearchCV(
model,
param_grid,
cv=5,
scoring="roc_auc",
n_jobs=-1,
verbose=1,
)
grid.fit(X_train, y_train)
print("Best score:", grid.best_score_)
print("Best params:", grid.best_params_)
Die doppelten Unterstriche sind keine Tippfehler, sondern scikit-learns Konvention zum Adressieren verschachtelter Parameter: schritt__unterschritt__parameter. Sieht beim ersten Mal seltsam aus, ist aber konsistent durch das ganze Framework.
RandomizedSearchCV für große Suchräume
Bei mehr als drei oder vier Hyperparametern explodiert das Grid. RandomizedSearchCV sampelt stattdessen aus Verteilungen und kommt mit einem Bruchteil der Aufrufe zu vergleichbaren Ergebnissen.
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform
param_dist = {
"clf__n_estimators": randint(50, 500),
"clf__max_depth": randint(2, 10),
"clf__learning_rate": uniform(0.01, 0.2),
}
search = RandomizedSearchCV(
model,
param_dist,
n_iter=40,
cv=5,
scoring="roc_auc",
random_state=42,
n_jobs=-1,
)
search.fit(X_train, y_train)
Für noch effizientere Suche bietet scikit-learn seit längerem auch HalvingRandomSearchCV – eine Successive-Halving-Variante, die schwache Konfigurationen früh aussortiert. Lohnt sich vor allem, wenn ein einzelner Fit teuer ist.
Eigene Transformer schreiben
Manchmal reichen die eingebauten Transformer einfach nicht. Mit FunctionTransformer oder einer eigenen Klasse, die BaseEstimator und TransformerMixin erbt, lässt sich beliebige Logik in die Pipeline einbinden.
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
class LogIncomeTransformer(BaseEstimator, TransformerMixin):
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
for col in self.columns:
X[col] = np.log1p(X[col].clip(lower=0))
return X
def get_feature_names_out(self, input_features=None):
return np.asarray(input_features)
Wichtig: get_feature_names_out bitte nicht vergessen. Sonst funktioniert der pandas-Output downstream nicht mehr und die Fehlermeldungen werden, sagen wir mal, kreativ.
Pipeline persistieren und in Production deployen
Ein gefittetes Pipeline-Objekt enthält alle gelernten Parameter – Mittelwerte des Imputers, Skalen des Scalers, Kategorien des OneHotEncoders, Modellgewichte. Persistieren lässt sich das mit joblib:
import joblib
joblib.dump(grid.best_estimator_, "model.joblib")
# Später, im Inference-Service:
model = joblib.load("model.joblib")
predictions = model.predict(new_data)
Drei Hinweise zur Production:
- Versionsfixierung: Persistierte scikit-learn-Modelle sind nicht garantiert zwischen Versionen kompatibel. Pinne die Version in
requirements.txtoderpyproject.toml– sonst wachst du irgendwann auf und ein Minor-Update bricht alles. - Sicherheit: Lade niemals
joblib- oderpickle-Dateien aus nicht vertrauenswürdiger Quelle. Sie können beliebigen Code ausführen. Für externe Distribution sind Formate wie ONNX oder skops sinnvoller. - Inferenz-Performance: Bei strengen Latenzanforderungen lohnt sich der Export nach ONNX über
skl2onnx. Das reduziert Cold-Start-Zeiten und Speicherverbrauch teilweise drastisch.
Pipeline-Visualisierung im Notebook
Seit scikit-learn 1.0 wird die Pipeline standardmäßig als HTML-Diagramm angezeigt:
from sklearn import set_config
set_config(display="diagram")
model # im Notebook ausführen
Das interaktive Diagramm ist Gold wert beim Debuggen verschachtelter Pipelines – und in Code-Reviews tatsächlich oft hilfreicher als jeder Screenshot.
Häufige Fehler und wie du sie vermeidest
1. Fit auf den ganzen Datensatz vor dem Split
Der Klassiker schlechthin. scaler.fit(X) vor train_test_split bedeutet, dass der Scaler bereits Information aus dem Testset gesehen hat. In einer Pipeline mit Cross-Validation passiert das per Definition nicht – einer der Hauptgründe, warum man das Ding überhaupt verwendet.
2. Cross-Validation auf Daten mit Zeitstruktur
Standard-KFold mischt Datensätze zufällig durch. Bei Zeitreihen führt das zu Leakage aus der Zukunft (und zu Modellen, die im Backtest brillant aussehen und live unterirdisch performen). Verwende TimeSeriesSplit:
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
tscv = TimeSeriesSplit(n_splits=5)
scores = cross_val_score(model, X, y, cv=tscv, scoring="neg_mean_absolute_error")
3. Target Encoding außerhalb der Pipeline
Wer Kategorien anhand der Zielvariable kodiert (Target Encoding, Mean Encoding), muss das innerhalb der Pipeline tun – sonst leakt die Zielinformation. Seit scikit-learn 1.3 gibt es dafür einen eingebauten TargetEncoder.
from sklearn.preprocessing import TargetEncoder
categorical_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="most_frequent")),
("target", TargetEncoder(target_type="binary", smooth="auto")),
])
4. Vergessen, n_jobs zu setzen
GridSearchCV(n_jobs=-1) nutzt alle CPU-Kerne. Auf einer modernen Workstation reduziert das Tuning-Zeiten von Stunden auf Minuten. Bei Modellen, die selbst parallelisieren (Random Forest, HistGradientBoosting), passt du n_jobs auf den äußeren Loop an, damit du nicht doppelt parallelisierst – sonst kannibalisieren sich die Threads gegenseitig und der Job wird langsamer, nicht schneller.
Komplettes Beispiel: End-to-End-Workflow
Zum Abschluss ein Setup, das du als Vorlage übernehmen kannst:
import pandas as pd
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import train_test_split, RandomizedSearchCV, StratifiedKFold
from sklearn.metrics import classification_report, roc_auc_score
from scipy.stats import randint, uniform
import joblib
df = pd.read_parquet("customers.parquet")
y = df.pop("churned")
X_train, X_test, y_train, y_test = train_test_split(
df, y, test_size=0.2, stratify=y, random_state=42
)
numeric = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
])
categorical = Pipeline([
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])
prep = ColumnTransformer([
("num", numeric, make_column_selector(dtype_include="number")),
("cat", categorical, make_column_selector(dtype_include="object")),
])
model = Pipeline([
("prep", prep),
("clf", HistGradientBoostingClassifier(random_state=42)),
])
param_dist = {
"clf__max_iter": randint(100, 500),
"clf__max_depth": randint(3, 10),
"clf__learning_rate": uniform(0.01, 0.2),
"clf__min_samples_leaf": randint(10, 100),
}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
search = RandomizedSearchCV(
model, param_dist, n_iter=50, cv=cv,
scoring="roc_auc", n_jobs=-1, random_state=42,
)
search.fit(X_train, y_train)
print("CV AUC:", search.best_score_)
y_proba = search.predict_proba(X_test)[:, 1]
print("Test AUC:", roc_auc_score(y_test, y_proba))
print(classification_report(y_test, search.predict(X_test)))
joblib.dump(search.best_estimator_, "churn_model.joblib")
Das Skript ist deterministisch, leakage-frei und production-ready. Du kannst es so ziemlich direkt in einen Trainingsjob packen (Airflow, Prefect, GitHub Actions – such dir was aus) und das gespeicherte Modell anschließend in einer FastAPI- oder BentoML-App laden.
FAQ
Was ist der Unterschied zwischen Pipeline und make_pipeline?
make_pipeline ist eine Convenience-Funktion, die automatisch Schritt-Namen aus den Klassennamen erzeugt (standardscaler, logisticregression). Pipeline verlangt explizite Namen. Für GridSearch ist Pipeline meist klarer, weil du selbst kontrollierst, wie die Parameter im Pfad heißen.
Wie debugge ich, was zwischen den Pipeline-Schritten passiert?
Drei Wege, die ich in dieser Reihenfolge nutze: Erstens set_config(transform_output="pandas") aktivieren und einzelne Schritte mit pipeline.named_steps["prep"].fit_transform(X_train) aufrufen. Zweitens set_config(display="diagram") für die HTML-Visualisierung. Drittens pipeline[:-1].fit_transform(X_train), um alles bis vor das Modell auszuführen und das Zwischenergebnis anzuschauen.
Funktionieren scikit-learn-Pipelines mit XGBoost oder LightGBM?
Ja. XGBoost (XGBClassifier), LightGBM (LGBMClassifier) und CatBoost sind scikit-learn-kompatibel und können als finaler Schritt in einer Pipeline verwendet werden. Hyperparameter werden über clf__learning_rate usw. adressiert. Beachte aber, dass CatBoost eigene Kategoriebehandlung mitbringt – hier kannst du den OneHotEncoder oft komplett weglassen.
Soll ich set_output(transform="pandas") immer verwenden?
Für Entwicklung und Debugging: ja, unbedingt. Für maximale Inferenz-Geschwindigkeit in Production: eher nein. NumPy-Arrays sind schneller, weil keine DataFrame-Wrappings entstehen. Im Zweifel: pandas-Output beim Training für Lesbarkeit, NumPy-Output beim Inference-Service.
Wie persistiere ich nur die Vorverarbeitung, nicht das Modell?
Slicing funktioniert: preproc_only = pipeline[:-1] ergibt eine neue Pipeline ohne den letzten Schritt, die du separat mit joblib.dump speichern kannst. Praktisch für Feature-Stores oder wenn mehrere Modelle dieselbe Vorverarbeitung teilen.
Fazit
scikit-learn-Pipelines sind kein optionales Komfort-Feature, sondern die Grundlage für korrektes ML. Sie verhindern Leakage, machen Code testbar, persistierbar und tunbar – und mit den aktuellen Verbesserungen rund um set_output, TargetEncoder und HalvingRandomSearchCV ist das Ökosystem 2026 reifer als je zuvor.
Wer noch mit lose verstreuten fit_transform-Aufrufen arbeitet, sollte den Schritt zur Pipeline echt nicht länger aufschieben. Es zahlt sich vom ersten Projekt an aus.