Scikit-learn Pipeline и ColumnTransformer: пошаговое руководство с примерами

Разбираем scikit-learn Pipeline и ColumnTransformer от основ до GridSearchCV. Рабочие примеры кода, кэширование, замена моделей и новые возможности scikit-learn 1.8 — GPU, free-threaded Python, temperature scaling.

Введение: зачем нужен Pipeline в scikit-learn

Если вы хоть раз писали ML-код на Python, то знаете это чувство: масштабирование данных в одном месте, заполнение пропусков — в другом, кодирование категорий — вообще в третьем файле, а модель обучается где-то в четвёртом. Проходит месяц, вы возвращаетесь к проекту и… не можете понять, какие преобразования в каком порядке применялись.

А ещё хуже — случайно подгоняете (fit) трансформер на тестовых данных и получаете утечку данных (data leakage). Честно говоря, я сам на это наступал в одном из первых проектов, и отлавливать такую ошибку — то ещё удовольствие.

Именно для решения этих проблем в scikit-learn есть Pipeline. Он объединяет все этапы предобработки и обучения модели в единый объект, который гарантирует правильный порядок операций и защищает от утечки данных.

В этом руководстве мы разберём Pipeline и ColumnTransformer от основ до продвинутых приёмов, включая подбор гиперпараметров через GridSearchCV. Все примеры актуальны для scikit-learn 1.8 (текущая версия на начало 2026 года) и используют реальные датасеты.

Что вы узнаете:

  • Как устроен Pipeline и почему он предотвращает утечку данных
  • Как использовать ColumnTransformer для разных типов признаков
  • Как собрать полный конвейер предобработки и обучения за 15 строк кода
  • Как подбирать гиперпараметры всего конвейера через GridSearchCV
  • Какие новые фишки появились в scikit-learn 1.8 для пайплайнов

Что такое Pipeline и как он работает

sklearn.pipeline.Pipeline — это, по сути, последовательность шагов. Каждый шаг (кроме последнего) является трансформером — реализует методы fit() и transform(). Последний шаг может быть чем угодно: трансформером, классификатором или регрессором.

Когда вы вызываете pipeline.fit(X_train, y_train), происходит следующее:

  1. Первый трансформер вызывает fit(X_train) и затем transform(X_train)
  2. Результат передаётся второму трансформеру, который тоже делает fit() + transform()
  3. И так далее, пока данные не дойдут до последнего шага, который вызывает только fit()

А при вызове pipeline.predict(X_test) все промежуточные шаги вызывают только transform() (без fit()!), а последний шаг — predict(). Вот это и есть ключевая защита от утечки данных. Просто и элегантно.

Простой пример Pipeline

Давайте сразу посмотрим на код:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_breast_cancer

# Загружаем датасет
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Создаём Pipeline
pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="mean")),
    ("scaler", StandardScaler()),
    ("classifier", LogisticRegression(max_iter=1000))
])

# Обучаем весь конвейер одной командой
pipe.fit(X_train, y_train)

# Предсказание — все трансформации применяются автоматически
accuracy = pipe.score(X_test, y_test)
print(f"Точность: {accuracy:.4f}")
# Точность: 0.9737

Обратите внимание: три операции — заполнение пропусков, масштабирование и обучение модели — уместились в один объект. При вызове pipe.score(X_test, y_test) тестовые данные проходят через те же трансформации, обученные на X_train. Никакой магии, просто хорошая архитектура.

Сокращённый синтаксис: make_pipeline

Если вам лень (или незачем) задавать имена шагов вручную, используйте make_pipeline — он сгенерирует имена автоматически:

from sklearn.pipeline import make_pipeline

pipe = make_pipeline(
    SimpleImputer(strategy="mean"),
    StandardScaler(),
    LogisticRegression(max_iter=1000)
)

# Имена шагов: "simpleimputer", "standardscaler", "logisticregression"
print(pipe.named_steps)

Удобно для быстрых экспериментов. Но для GridSearchCV явные имена шагов всё-таки лучше — легче задавать параметры.

ColumnTransformer: разные преобразования для разных типов данных

В реальных задачах данные почти всегда содержат столбцы разных типов: числовые, категориальные, текстовые. Масштабирование нужно применять к числам, а One-Hot Encoding — к категориям. Простой Pipeline применяет все шаги ко всем столбцам одинаково, и тут на помощь приходит ColumnTransformer.

ColumnTransformer принимает список из тройки (имя, трансформер, столбцы) и применяет каждый трансформер к указанным столбцам. Результаты конкатенируются в единое пространство признаков.

Практический пример: датасет Titanic

Разберём полный пример на классическом Titanic — там есть и числовые, и категориальные признаки, что делает его идеальным для демонстрации:

import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_openml

# Загружаем данные
X, y = fetch_openml("titanic", version=1, as_frame=True, return_X_y=True)

# Определяем типы признаков
numeric_features = ["age", "fare"]
categorical_features = ["embarked", "sex", "pclass"]

# Пайплайн для числовых признаков
numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

# Пайплайн для категориальных признаков
categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
    ("encoder", OneHotEncoder(handle_unknown="ignore"))
])

# Объединяем через ColumnTransformer
preprocessor = ColumnTransformer(transformers=[
    ("num", numeric_transformer, numeric_features),
    ("cat", categorical_transformer, categorical_features)
])

# Финальный Pipeline: предобработка + модель
clf = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("classifier", LogisticRegression(max_iter=1000))
])

# Обучение и оценка
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
clf.fit(X_train, y_train)
print(f"Точность: {clf.score(X_test, y_test):.4f}")
# Точность: 0.7863

Выглядит чуть длиннее, чем без Pipeline, зато вся логика предобработки собрана в одном месте. И самое главное — никакой утечки данных.

Параметр remainder

По умолчанию ColumnTransformer просто отбрасывает столбцы, которые не были явно указаны (remainder="drop"). Это может быть неожиданностью, если вы забыли включить какие-то признаки. Чтобы сохранить их без преобразований:

# Оставить необработанные столбцы как есть
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features)
    ],
    remainder="passthrough"  # столбцы без преобразования добавляются в конец
)

Также можно передать трансформер в remainder, например remainder=StandardScaler() — тогда ко всем оставшимся столбцам будет применён указанный трансформер. Довольно гибко.

Выбор столбцов через make_column_selector

Вместо ручного перечисления столбцов можно использовать автоматический выбор по типу данных:

from sklearn.compose import make_column_selector

preprocessor = ColumnTransformer(transformers=[
    ("num", numeric_transformer, make_column_selector(dtype_include="number")),
    ("cat", categorical_transformer, make_column_selector(dtype_include="category"))
])

Особенно удобно, когда столбцов много и перечислять их вручную — себе дороже.

Подбор гиперпараметров через GridSearchCV

Вот тут Pipeline раскрывается по-настоящему. Одно из его главных преимуществ — возможность подбирать гиперпараметры всех шагов конвейера одновременно. Для этого используется стандартный GridSearchCV (или RandomizedSearchCV). Нужно только правильно указать имена параметров через двойное подчёркивание __.

Синтаксис имён параметров

Схема именования вложенных параметров выглядит так:

"шаг_pipeline__шаг_column_transformer__параметр"

Например, для нашего конвейера:

  • "preprocessor__num__imputer__strategy" — стратегия импутации числовых признаков
  • "preprocessor__cat__encoder__handle_unknown" — обработка неизвестных категорий
  • "classifier__C" — параметр регуляризации модели

Поначалу эти длинные строки с двойными подчёркиваниями выглядят страшновато, но к ним быстро привыкаешь.

Полный пример с GridSearchCV

from sklearn.model_selection import GridSearchCV

# Сетка гиперпараметров
param_grid = {
    "preprocessor__num__imputer__strategy": ["mean", "median"],
    "classifier__C": [0.01, 0.1, 1.0, 10.0],
    "classifier__solver": ["lbfgs", "liblinear"]
}

# Запускаем поиск
grid_search = GridSearchCV(
    clf,
    param_grid,
    cv=5,
    scoring="accuracy",
    n_jobs=-1,
    verbose=1
)

grid_search.fit(X_train, y_train)

print(f"Лучшие параметры: {grid_search.best_params_}")
print(f"Лучший CV-score: {grid_search.best_score_:.4f}")
print(f"Точность на тесте: {grid_search.score(X_test, y_test):.4f}")

Параметр n_jobs=-1 означает использование всех доступных ядер процессора. На датасете Titanic разница незаметна, но на больших данных это значительно ускоряет перебор.

RandomizedSearchCV для больших пространств

Если пространство гиперпараметров слишком велико для полного перебора (а оно обычно именно таково), используйте RandomizedSearchCV. Он случайным образом выбирает заданное количество комбинаций:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import loguniform

param_distributions = {
    "preprocessor__num__imputer__strategy": ["mean", "median"],
    "classifier__C": loguniform(0.001, 100),
    "classifier__solver": ["lbfgs", "liblinear"]
}

random_search = RandomizedSearchCV(
    clf,
    param_distributions,
    n_iter=20,       # проверить 20 случайных комбинаций
    cv=5,
    scoring="accuracy",
    random_state=42,
    n_jobs=-1
)

random_search.fit(X_train, y_train)
print(f"Лучшие параметры: {random_search.best_params_}")

Распределение loguniform(0.001, 100) означает, что значение C будет выбираться из логарифмически равномерного распределения. Это лучше, чем линейная сетка, потому что параметр регуляризации обычно варьируется на несколько порядков. По моему опыту, RandomizedSearchCV с 20–50 итерациями даёт результат, сопоставимый с полным перебором, но в разы быстрее.

Продвинутые приёмы работы с Pipeline

Кэширование трансформеров

Если предобработка занимает много времени (а с большими датасетами это нередкость), а вы экспериментируете только с параметрами финальной модели — используйте параметр memory для кэширования результатов промежуточных шагов:

from tempfile import mkdtemp
from joblib import Memory

cachedir = mkdtemp()
cached_pipe = Pipeline(
    steps=[
        ("preprocessor", preprocessor),
        ("classifier", LogisticRegression(max_iter=1000))
    ],
    memory=Memory(cachedir, verbose=0)
)

# Первый вызов fit — все шаги выполняются
cached_pipe.fit(X_train, y_train)

# Повторный вызов с другим классификатором —
# предобработка берётся из кэша
cached_pipe.set_params(classifier=LogisticRegression(C=0.1, max_iter=1000))
cached_pipe.fit(X_train, y_train)  # предобработка не пересчитывается

На мелких датасетах разница несущественна, но если ваш препроцессинг работает минуту-другую — кэширование экономит кучу времени.

Визуализация Pipeline

В scikit-learn есть встроенная HTML-визуализация конвейера. В Jupyter Notebook она отображается автоматически — просто выведите объект:

from sklearn import set_config

set_config(display="diagram")

# Просто выведите объект pipeline — и увидите интерактивную схему
clf

Вы получите наглядную диаграмму с раскрывающимися блоками для каждого шага. Очень удобно для отладки и когда нужно показать конвейер коллегам.

Замена модели в готовом Pipeline

Предобработка остаётся прежней, а модель легко заменить через set_params. Это позволяет быстро протестировать несколько алгоритмов:

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

# Пробуем Random Forest
clf.set_params(classifier=RandomForestClassifier(n_estimators=100, random_state=42))
clf.fit(X_train, y_train)
print(f"Random Forest: {clf.score(X_test, y_test):.4f}")

# Пробуем Gradient Boosting
clf.set_params(classifier=GradientBoostingClassifier(n_estimators=100, random_state=42))
clf.fit(X_train, y_train)
print(f"Gradient Boosting: {clf.score(X_test, y_test):.4f}")

Доступ к промежуточным результатам

Иногда хочется посмотреть, как выглядят данные после предобработки, но до обучения модели. Для этого можно использовать слайсинг Pipeline (да, он поддерживает индексацию!):

# Получить результат предобработки (без обучения модели)
X_train_preprocessed = clf[:-1].fit_transform(X_train, y_train)
print(f"Форма после предобработки: {X_train_preprocessed.shape}")

# Или через именованные шаги
preprocessor_fitted = clf.named_steps["preprocessor"]

Новые возможности scikit-learn 1.8 для Pipeline

Текущая версия scikit-learn — 1.8.0 (вышла в декабре 2025). Вот что важного появилось для работы с конвейерами.

Поддержка Array API и GPU

scikit-learn 1.8 расширяет поддержку стандарта Python Array API. На практике это значит, что ряд трансформеров и оценщиков теперь принимают массивы PyTorch и CuPy напрямую, позволяя выполнять вычисления на GPU:

import torch
from sklearn.preprocessing import StandardScaler

# Данные на GPU через PyTorch
X_gpu = torch.randn(10000, 50, device="cuda")

# StandardScaler теперь поддерживает PyTorch tensors
scaler = StandardScaler()
scaler.set_output(transform="default")
X_scaled = scaler.fit_transform(X_gpu)  # вычисления на GPU

В версии 1.8 поддержку Array API получили: StandardScaler, PolynomialFeatures, RidgeCV, RidgeClassifierCV, GaussianMixture и CalibratedClassifierCV. Список постепенно расширяется от версии к версии.

Поддержка free-threaded Python

Это, пожалуй, одна из самых интересных новостей. scikit-learn 1.8 поддерживает free-threaded CPython — версию без глобальной блокировки GIL. Если вы используете CPython 3.13t или новее, то n_jobs > 1 в GridSearchCV может работать через потоки вместо процессов, что убирает накладные расходы на межпроцессное взаимодействие:

# С free-threaded CPython (3.13t+):
# n_jobs=-1 использует потоки без GIL — быстрее для GridSearchCV
grid_search = GridSearchCV(clf, param_grid, cv=5, n_jobs=-1)

Temperature Scaling для калибровки вероятностей

В CalibratedClassifierCV появился новый метод method="temperature", который использует единственный параметр для калибровки вероятностей. Он особенно хорош для многоклассовых задач и легко встраивается в Pipeline:

from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import RandomForestClassifier

calibrated_clf = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("classifier", CalibratedClassifierCV(
        RandomForestClassifier(n_estimators=100),
        method="temperature",  # новый метод в 1.8
        cv=3
    ))
])

Совместимость с Python 3.11–3.14

scikit-learn 1.8 поддерживает Python 3.11 и выше, вплоть до 3.14. Если вы ещё на более старой версии Python — самое время обновиться.

Сравнение Pipeline со стандартным подходом

Чтобы стало совсем понятно, зачем всё это нужно, сравним два подхода на одной и той же задаче.

Без Pipeline (типичные ошибки)

# ❌ Частая ошибка: fit на всех данных до разбиения
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # утечка данных!
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y)

# ❌ Или: забыли применить transform к тестовым данным
imputer = SimpleImputer()
X_train_imp = imputer.fit_transform(X_train)
X_test_imp = X_test  # пропуски не заполнены — ошибка при predict!

Я видел оба эти паттерна в рабочих проектах. Второй особенно коварен — код не падает сразу, а тихо портит результаты.

С Pipeline (всё корректно)

# ✅ Pipeline гарантирует правильный порядок
pipe = make_pipeline(SimpleImputer(), StandardScaler(), LogisticRegression())
X_train, X_test, y_train, y_test = train_test_split(X, y)
pipe.fit(X_train, y_train)       # fit только на train
pipe.score(X_test, y_test)       # transform + predict на test

Pipeline не только короче, но и безопаснее. Каждый трансформер обучается строго на обучающей выборке и применяется к обеим выборкам автоматически. Меньше кода, меньше шансов ошибиться.

Полный рабочий пример: от сырых данных до модели

Итак, соберём всё вместе в одном скрипте — от загрузки данных до оценки модели с подбором гиперпараметров. Можете скопировать и запустить:

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import (
    train_test_split,
    RandomizedSearchCV,
    cross_val_score
)
from sklearn.datasets import fetch_openml
from scipy.stats import randint, loguniform

# === 1. Загрузка данных ===
X, y = fetch_openml("titanic", version=1, as_frame=True, return_X_y=True)

# === 2. Определение признаков ===
numeric_features = ["age", "fare", "sibsp", "parch"]
categorical_features = ["embarked", "sex", "pclass"]

# === 3. Пайплайны предобработки ===
numeric_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

categorical_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

preprocessor = ColumnTransformer([
    ("num", numeric_pipe, numeric_features),
    ("cat", categorical_pipe, categorical_features)
])

# === 4. Финальный конвейер ===
full_pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("classifier", GradientBoostingClassifier(random_state=42))
])

# === 5. Разбиение данных ===
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# === 6. Подбор гиперпараметров ===
param_distributions = {
    "preprocessor__num__imputer__strategy": ["mean", "median"],
    "classifier__n_estimators": randint(50, 300),
    "classifier__max_depth": randint(2, 8),
    "classifier__learning_rate": loguniform(0.01, 0.3),
    "classifier__subsample": [0.7, 0.8, 0.9, 1.0]
}

search = RandomizedSearchCV(
    full_pipeline,
    param_distributions,
    n_iter=30,
    cv=5,
    scoring="accuracy",
    random_state=42,
    n_jobs=-1,
    verbose=1
)

search.fit(X_train, y_train)

# === 7. Результаты ===
print(f"Лучшие параметры: {search.best_params_}")
print(f"CV-score: {search.best_score_:.4f}")
print(f"Точность на тесте: {search.score(X_test, y_test):.4f}")

Этот скрипт выполняет полный цикл: загрузка данных, раздельная предобработка числовых и категориальных признаков, обучение GradientBoosting с подбором гиперпараметров и финальная оценка. И весь конвейер защищён от утечки данных — всё благодаря Pipeline.

Советы и типичные ошибки

Вот несколько практических рекомендаций, которые сэкономят вам время (и нервы):

  • Не забывайте про sparse_output. OneHotEncoder по умолчанию возвращает разреженную матрицу. Если финальная модель не поддерживает sparse-данные, используйте sparse_output=False. Это частый источник непонятных ошибок.
  • Указывайте handle_unknown="ignore" в OneHotEncoder. Иначе новые категории в тестовых данных вызовут исключение — а в продакшене новые категории появляются почти гарантированно.
  • Используйте set_output(transform="pandas") для получения DataFrame вместо массива NumPy на выходе. Это очень удобно при отладке.
  • Именуйте шаги осмысленно. Вместо "step1", "step2" используйте "imputer", "scaler" — потом скажете себе спасибо при задании параметров в GridSearchCV.
  • Не используйте fit_transform на тестовых данных. Pipeline делает это правильно автоматически, но если работаете с отдельными трансформерами — помните об этом правиле.
# Получение DataFrame вместо массива на выходе
from sklearn import set_config
set_config(transform_output="pandas")

# Теперь все трансформеры возвращают DataFrame
X_transformed = preprocessor.fit_transform(X_train)
print(type(X_transformed))       # pandas.DataFrame
print(X_transformed.columns[:5]) # имена столбцов сохранены

Часто задаваемые вопросы

В чём разница между Pipeline и ColumnTransformer?

Pipeline выполняет шаги последовательно: выход одного шага является входом следующего. ColumnTransformer применяет разные трансформеры параллельно к разным подмножествам столбцов и конкатенирует результаты. На практике ColumnTransformer почти всегда используется как один из шагов внутри Pipeline — они дополняют друг друга.

Как сохранить обученный Pipeline для продакшена?

Используйте joblib — стандартный инструмент для сериализации объектов scikit-learn:

import joblib

# Сохранение
joblib.dump(search.best_estimator_, "best_model_pipeline.pkl")

# Загрузка
loaded_pipeline = joblib.load("best_model_pipeline.pkl")
predictions = loaded_pipeline.predict(new_data)

Сохраняется весь конвейер целиком — включая обученные трансформеры и модель. На стороне загрузки не нужно повторять предобработку, что заметно упрощает деплой.

Можно ли использовать Pipeline с нейронными сетями?

Да, но потребуются обёртки-совместимости. Для PyTorch используйте skorch, для Keras/TensorFlow — scikeras. Эти библиотеки оборачивают нейросеть в интерфейс scikit-learn (fit/predict), позволяя встроить её как последний шаг Pipeline. Работает на удивление гладко.

Как добавить собственный трансформер в Pipeline?

Создайте класс, наследующий от BaseEstimator и TransformerMixin:

from sklearn.base import BaseEstimator, TransformerMixin

class LogTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self  # ничего обучать не нужно

    def transform(self, X):
        return np.log1p(X)  # log(1 + x) для числовых признаков

# Использование в Pipeline
pipe = Pipeline([
    ("log", LogTransformer()),
    ("scaler", StandardScaler()),
    ("model", LogisticRegression())
])

Наследование от TransformerMixin автоматически добавляет метод fit_transform, а от BaseEstimatorget_params и set_params, необходимые для GridSearchCV.

Как Pipeline работает с кросс-валидацией — данные утекают или нет?

Нет, утечки не происходит. При использовании cross_val_score(pipeline, X, y, cv=5) на каждом фолде Pipeline заново обучается (fit) только на обучающей части и применяет трансформации к валидационной. Это принципиальное отличие от ситуации, когда вы сначала предобрабатываете все данные целиком, а потом запускаете кросс-валидацию — в том случае утечка гарантирована.

Об авторе Editorial Team

Our team of expert writers and editors.