Въведение: Защо ML пайплайните са толкова важни
Ако сте работили по реални проекти за машинно обучение, със сигурност сте се сблъсквали с добре познатите проблеми: разхвърлян код за предобработка, разпръснат из различни клетки на Jupyter notebook; странни разлики между резултатите на тренировъчния и тестовия набор; и вечният кошмар — невъзможността да възпроизведете собствените си резултати три месеца по-късно.
Звучи ви познато? Не сте сами.
Точно тези проблеми решават Pipeline обектите в scikit-learn. Пайплайнът е механизъм, който обединява всички стъпки от предобработката до обучението на модела в единна, последователна верига. Вместо ръчно да викате fit() и transform() за всеки трансформатор поотделно, пайплайнът автоматизира целия процес.
Хайде да видим трите основни проблема, които пайплайните елиминират:
- Спагети код (Spaghetti code) — без пайплайн всяка стъпка от предобработката е отделен блок код, който трябва ръчно да се изпълнява в правилния ред. Промените на една стъпка лесно чупят целия процес.
- Изтичане на данни (Data leakage) — когато нормализирате целия датасет преди разделянето на тренировъчен и тестов набор, информация от тестовите данни „изтича" в модела. Пайплайнът гарантира, че всеки
fit()се извиква само върху тренировъчните данни. - Невъзпроизводимост — без стандартизиран пайплайн е трудно да гарантирате, че колегата ви (или вие самите след 3 месеца) ще получи същите резултати.
В това ръководство ще разгледаме подробно всички аспекти на scikit-learn Pipeline — от основите до напреднали техники, включително ColumnTransformer, FeatureUnion, потребителски трансформатори, кръстосана валидация и оптимизация на хиперпараметри. Всички примери са съвместими с scikit-learn 1.8 (актуалната версия към 2026 г.).
Какво е Pipeline в scikit-learn
В scikit-learn Pipeline е обект, който свързва последователност от стъпки — трансформатори (обекти с методи fit() и transform()) и естиматор (обект с метод fit() и predict()). Правилото е просто: всички стъпки освен последната трябва да бъдат трансформатори, а последната може да бъде или трансформатор, или естиматор.
Когато извикате pipeline.fit(X, y), scikit-learn автоматично:
- Извиква
fit_transform()на всеки трансформатор последователно - Подава трансформираните данни на следващата стъпка
- Извиква
fit()на последния естиматор
А когато извикате pipeline.predict(X), всеки трансформатор изпълнява само transform() (без fit()!), а последният естиматор — predict().
Нека видим най-простия пример:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# Зареждане на данните
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Създаване на пайплайн с две стъпки
pipe = Pipeline([
('scaler', StandardScaler()), # Стъпка 1: стандартизация
('classifier', LogisticRegression()) # Стъпка 2: класификация
])
# Обучение — fit() се прилага и на скалера, и на модела
pipe.fit(X_train, y_train)
# Предсказване — transform() на скалера + predict() на модела
accuracy = pipe.score(X_test, y_test)
print(f"Точност: {accuracy:.4f}")
# Точност: 0.9667
Забележете как с една единствена команда pipe.fit() обучаваме и скалера, и модела. При pipe.score() тестовите данни автоматично минават през стандартизация преди класификация. Това е красотата на пайплайните — просто няма как да забравите стъпка или да объркате реда.
Удобство с make_pipeline
Ако не ви се занимава ръчно да задавате имена на стъпките, има по-бърз вариант — make_pipeline. Тя автоматично генерира имена от класовете:
from sklearn.pipeline import make_pipeline
# Автоматични имена: 'standardscaler', 'logisticregression'
pipe = make_pipeline(StandardScaler(), LogisticRegression())
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'logisticregression': LogisticRegression()}
ColumnTransformer: Обработка на различни типове данни
В реалния свят данните рядко са чисто числови. Обикновено имаме комбинация от числови и категорийни колони, като всяка група изисква различна предобработка. Честно казано, точно тук нещата стават наистина интересни.
ColumnTransformer съществува именно за тази цел — той позволява да приложите различни трансформации на различни подмножества от колони.
Основна концепция
ColumnTransformer приема списък от тройки (име, трансформатор, колони) и паралелно прилага всяка трансформация върху съответните колони. Резултатите се обединяват хоризонтално (по колони).
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
# Примерен датасет с различни типове данни
data = pd.DataFrame({
'възраст': [25, 30, np.nan, 45, 35],
'заплата': [50000, 60000, 75000, np.nan, 55000],
'град': ['София', 'Пловдив', 'Варна', 'София', 'Бургас'],
'образование': ['бакалавър', 'магистър', 'бакалавър', 'доктор', 'магистър']
})
# Дефиниране на колоните по тип
числови_колони = ['възраст', 'заплата']
категорийни_колони = ['град', 'образование']
# Пайплайн за числови данни: запълване + скалиране
числов_пайплайн = Pipeline([
('imputer', SimpleImputer(strategy='median')), # Запълване с медиана
('scaler', StandardScaler()) # Стандартизация
])
# Пайплайн за категорийни данни: запълване + кодиране
категориен_пайплайн = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')), # Най-честа стойност
('encoder', OneHotEncoder(handle_unknown='ignore')) # One-hot кодиране
])
# Обединяване с ColumnTransformer
preprocessor = ColumnTransformer([
('числови', числов_пайплайн, числови_колони),
('категорийни', категориен_пайплайн, категорийни_колони)
])
Автоматично определяне на колони с make_column_selector
Вместо ръчно да изброявате колоните, можете да използвате make_column_selector — той автоматично избира колони по тип. Това е доста удобно, особено при датасети, които се променят с времето.
from sklearn.compose import make_column_selector
# Автоматичен подбор на колони по dtype
preprocessor = ColumnTransformer([
('числови', числов_пайплайн,
make_column_selector(dtype_include=np.number)),
('категорийни', категориен_пайплайн,
make_column_selector(dtype_include=object))
])
# Тестване
result = preprocessor.fit_transform(data)
print(f"Форма на резултата: {result.shape}")
# Форма на резултата: (5, 8) — 2 числови + 6 one-hot колони
Не е нужно ръчно да актуализирате списъка с колони всеки път, когато добавите нова — селекторът се грижи за това.
Параметърът remainder
По подразбиране ColumnTransformer отхвърля колоните, които не са включени в нито един трансформатор. Ако искате да ги запазите, ползвайте параметъра remainder:
# Запазване на необработените колони
preprocessor = ColumnTransformer([
('числови', числов_пайплайн, числови_колони),
], remainder='passthrough') # Останалите колони се запазват непроменени
# Или отхвърляне (по подразбиране)
# remainder='drop'
Изграждане на пълен ML пайплайн стъпка по стъпка
Добре, нека съберем всичко заедно. Ще изградим пълен пайплайн от началото до края, използвайки класическия набор от данни за цени на жилища от Калифорния.
import pandas as pd
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, r2_score
# === Стъпка 1: Зареждане на данните ===
housing = fetch_california_housing(as_frame=True)
df = housing.frame
print(f"Размер на датасета: {df.shape}")
print(f"Колони: {df.columns.tolist()}")
# Добавяне на изкуствена категорийна колона за демонстрация
df['ocean_proximity'] = pd.cut(
df['Longitude'],
bins=[-125, -122, -119, -114],
labels=['близо_до_океана', 'вътрешност', 'пустиня']
)
# Разделяне на характеристики и целева променлива
X = df.drop('MedHouseVal', axis=1)
y = df['MedHouseVal']
# === Стъпка 2: Разделяне на тренировъчен и тестов набор ===
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f"Тренировъчен набор: {X_train.shape}")
print(f"Тестов набор: {X_test.shape}")
# === Стъпка 3: Дефиниране на предобработката ===
числов_трансформатор = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
категориен_трансформатор = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('num', числов_трансформатор,
make_column_selector(dtype_include=np.number)),
('cat', категориен_трансформатор,
make_column_selector(dtype_include='category'))
])
# === Стъпка 4: Пълен пайплайн с модел ===
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('model', GradientBoostingRegressor(
n_estimators=200,
max_depth=5,
learning_rate=0.1,
random_state=42
))
])
# === Стъпка 5: Обучение ===
full_pipeline.fit(X_train, y_train)
# === Стъпка 6: Оценка ===
y_pred = full_pipeline.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"\nРезултати:")
print(f" MAE: {mae:.4f}")
print(f" R²: {r2:.4f}")
# Резултати:
# MAE: 0.3312
# R²: 0.8245
Забелязвате ли колко чист е кодът? Целият процес — от предобработката до обучението — е капсулиран в един единствен обект. За предсказване на нови данни просто викате full_pipeline.predict(нови_данни) и всичко се случва автоматично. Никакво ръчно жонглиране с отделни стъпки.
Достъп до отделните стъпки
Разбира се, можете да инспектирате всяка стъпка от пайплайна, ако ви трябва:
# Достъп до предобработчика
preprocessor = full_pipeline.named_steps['preprocessor']
# Достъп до модела
model = full_pipeline.named_steps['model']
print(f"Брой дървета: {model.n_estimators}")
# Достъп до скалера вътре в предобработчика
scaler = preprocessor.named_transformers_['num'].named_steps['scaler']
print(f"Средни стойности: {scaler.mean_[:3]}") # Първите 3
Кръстосана валидация с Pipeline
Една от най-важните причини да използвате пайплайни е предотвратяването на изтичане на данни при кръстосана валидация. И трябва да кажа — този проблем е по-коварен, отколкото повечето хора си мислят.
Проблемът: изтичане на данни
Разгледайте следния код. Изглежда напълно безобиден, нали?
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.datasets import make_classification
# Генериране на данни
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
# ❌ ГРЕШЕН подход: скалиране ПРЕДИ кръстосана валидация
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # fit() върху ЦЕЛИЯ датасет!
model = LogisticRegression()
scores = cross_val_score(model, X_scaled, y, cv=5)
print(f"Грешен подход — точност: {scores.mean():.4f}")
# Резултатът е оптимистично завишен!
Проблемът е, че StandardScaler вече е обучен върху целия датасет, включително валидационните фолдове. Средната стойност и стандартното отклонение съдържат информация от данни, които би трябвало да са „невидими" за модела. Разликата в резултатите може да е малка при прости случаи, но при по-сложна предобработка (PCA, feature selection) тя може да е значителна.
Решението: Pipeline + cross_val_score
from sklearn.pipeline import make_pipeline
# ✅ ПРАВИЛЕН подход: скалирането е ВЪТРЕ в пайплайна
pipe = make_pipeline(StandardScaler(), LogisticRegression())
# cross_val_score автоматично извиква fit() на пайплайна
# за всеки фолд — скалерът се обучава САМО на тренировъчните данни
scores = cross_val_score(pipe, X, y, cv=5)
print(f"Правилен подход — точност: {scores.mean():.4f}")
Когато cross_val_score работи с Pipeline обект, за всеки фолд се случва следното:
- Данните се разделят на тренировъчен и валидационен подмножество
fit_transform()се извиква на трансформаторите само върху тренировъчното подмножествоtransform()се прилага върху валидационното подмножество- Моделът се обучава и оценява коректно
Това е фундаменталната гаранция — никаква информация от валидационните данни не изтича в процеса на обучение. Честно, самият аз съм хващал този проблем в продукционен код не веднъж.
Подробна кръстосана валидация с cross_validate
Ако искате повече метрики наведнъж, cross_validate е по-мощната алтернатива:
from sklearn.model_selection import cross_validate
# Получаване на повече информация
cv_results = cross_validate(
pipe, X, y, cv=5,
scoring=['accuracy', 'f1_weighted', 'roc_auc_ovr'],
return_train_score=True
)
print("Валидационна точност по фолдове:", cv_results['test_accuracy'])
print(f"Средна точност: {cv_results['test_accuracy'].mean():.4f}")
print(f"Средно F1: {cv_results['test_f1_weighted'].mean():.4f}")
print(f"Средно ROC AUC: {cv_results['test_roc_auc_ovr'].mean():.4f}")
Оптимизация на хиперпараметри с GridSearchCV
Тук нещата стават наистина мощни. Една от най-полезните функционалности на scikit-learn Pipeline е безпроблемната интеграция с GridSearchCV и RandomizedSearchCV. Можете да търсите оптимални хиперпараметри не само на модела, но и на самите стъпки от предобработката.
Конвенцията с двойното подчертаване
За да укажете параметър на конкретна стъпка от пайплайна, използвате конвенцията стъпка__параметър (двойно подчертаване). При вложени пайплайни верижното именуване продължава: preprocessor__num__scaler__with_mean.
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_wine
# Зареждане на данни
X, y = load_wine(return_X_y=True)
# Дефиниране на пайплайн
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA()),
('classifier', RandomForestClassifier(random_state=42))
])
# Пространство за търсене — включва параметри от ВСИЧКИ стъпки
param_grid = {
# Параметри на PCA стъпката
'pca__n_components': [5, 8, 10, 13],
# Параметри на класификатора
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [3, 5, 10, None],
'classifier__min_samples_split': [2, 5, 10]
}
# GridSearchCV с пайплайн
grid_search = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1, # Паралелно изпълнение
verbose=1
)
grid_search.fit(X, y)
print(f"Най-добра точност: {grid_search.best_score_:.4f}")
print(f"Най-добри параметри: {grid_search.best_params_}")
# Най-добра точност: 0.9831
# Най-добри параметри: {'classifier__max_depth': None,
# 'classifier__min_samples_split': 2, 'classifier__n_estimators': 200,
# 'pca__n_components': 10}
Търсене в самия скалер
Можете дори да изберете кой скалер да използвате чрез Grid Search — доста готина възможност:
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
# Различни предобработки за сравнение
param_grid = [
{
'scaler': [StandardScaler()],
'pca__n_components': [5, 10],
'classifier__n_estimators': [100, 200]
},
{
'scaler': [MinMaxScaler()],
'pca__n_components': [5, 10],
'classifier__n_estimators': [100, 200]
},
{
'scaler': [RobustScaler()],
'pca__n_components': [5, 10],
'classifier__n_estimators': [100, 200]
}
]
grid_search = GridSearchCV(pipe, param_grid, cv=5, scoring='accuracy', n_jobs=-1)
grid_search.fit(X, y)
print(f"Най-добър скалер: {grid_search.best_params_['scaler']}")
print(f"Точност: {grid_search.best_score_:.4f}")
RandomizedSearchCV за по-бързо търсене
При голямо пространство на хиперпараметри GridSearchCV може да стане прекалено бавен. В такива случаи RandomizedSearchCV е много по-практичен — вместо да пробва всяка комбинация, той семплира случайни конфигурации:
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform
param_distributions = {
'pca__n_components': randint(3, 13),
'classifier__n_estimators': randint(50, 300),
'classifier__max_depth': [3, 5, 10, 15, None],
'classifier__min_samples_split': randint(2, 20)
}
random_search = RandomizedSearchCV(
pipe,
param_distributions,
n_iter=50, # Брой случайни комбинации
cv=5,
scoring='accuracy',
random_state=42,
n_jobs=-1
)
random_search.fit(X, y)
print(f"Най-добра точност: {random_search.best_score_:.4f}")
Създаване на потребителски трансформатори
Когато стандартните трансформатори не свършват работата (а рано или късно ще стигнете до този момент), можете да си създадете свои собствени. Scikit-learn предоставя два подхода: наследяване на BaseEstimator и TransformerMixin, или по-простият FunctionTransformer.
Пълноценен потребителски трансформатор
Нека създадем трансформатор за инженерство на характеристики, който генерира нови колони от съществуващи:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
class FeatureEngineer(BaseEstimator, TransformerMixin):
"""
Потребителски трансформатор за създаване на нови характеристики.
Генерира съотношения, логаритмични и полиномни характеристики.
"""
def __init__(self, add_ratios=True, add_log=True, log_columns=None):
# Параметрите ТРЯБВА да се запазят като атрибути със същите имена
self.add_ratios = add_ratios
self.add_log = add_log
self.log_columns = log_columns
def fit(self, X, y=None):
"""Научаване на необходимата информация от тренировъчните данни."""
# Запазване на имената на колоните ако X е DataFrame
if isinstance(X, pd.DataFrame):
self.feature_names_in_ = X.columns.tolist()
return self # Винаги връщаме self!
def transform(self, X, y=None):
"""Прилагане на трансформацията."""
# Работа с копие за да не променяме оригинала
X_copy = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
if self.add_ratios and X_copy.shape[1] >= 2:
# Създаване на съотношение между първите две колони
col1, col2 = X_copy.columns[0], X_copy.columns[1]
X_copy[f'{col1}_to_{col2}_ratio'] = (
X_copy[col1] / X_copy[col2].replace(0, np.nan)
).fillna(0)
if self.add_log:
# Логаритмични характеристики за посочените колони
cols = self.log_columns or X_copy.columns[:2]
for col in cols:
if col in X_copy.columns:
X_copy[f'log_{col}'] = np.log1p(
X_copy[col].clip(lower=0)
)
return X_copy
def get_feature_names_out(self, input_features=None):
"""Връщане на имената на изходните характеристики."""
# Важно за интеграция с set_output API
return self.feature_names_out_
# Използване в пайплайн
pipe = Pipeline([
('feature_eng', FeatureEngineer(add_ratios=True, add_log=True)),
('scaler', StandardScaler()),
('model', LogisticRegression())
])
# Параметрите на потребителския трансформатор са достъпни за GridSearch!
param_grid = {
'feature_eng__add_ratios': [True, False],
'feature_eng__add_log': [True, False],
}
Ключови правила за потребителските трансформатори (запомнете ги, ще ви спестят доста главоболия):
- Параметрите на
__init__трябва да се запазят като атрибути със абсолютно същите имена — scikit-learn разчита на това заget_params()иset_params() fit()винаги трябва да връщаself- Наследяването от
TransformerMixinавтоматично добавяfit_transform() - Наследяването от
BaseEstimatorавтоматично добавяget_params()иset_params()
FunctionTransformer за прости случаи
Когато трансформацията е проста функция без състояние, не е нужно да пишете цял клас. FunctionTransformer е по-лесният и бърз избор:
from sklearn.preprocessing import FunctionTransformer
# Проста логаритмична трансформация
log_transformer = FunctionTransformer(
func=np.log1p, # Функция за transform()
inverse_func=np.expm1, # Функция за inverse_transform()
validate=True
)
# Потребителска функция
def добави_полиномни(X):
"""Добавяне на квадратни характеристики."""
X_poly = np.column_stack([X, X ** 2])
return X_poly
poly_transformer = FunctionTransformer(добави_полиномни)
# Използване в пайплайн
pipe = make_pipeline(
log_transformer,
StandardScaler(),
LogisticRegression()
)
Потребителски трансформатор с pandas изход
В scikit-learn 1.8 можете да използвате set_output API, за да получите DataFrame вместо numpy масив. Това е изключително полезно при дебъгване:
# Активиране на pandas изход за целия пайплайн
pipe.set_output(transform='pandas')
# Или за отделен трансформатор
scaler = StandardScaler().set_output(transform='pandas')
result = scaler.fit_transform(X_train)
print(type(result)) # pandas.DataFrame — запазва имената на колоните!
FeatureUnion: Комбиниране на множество трансформации
FeatureUnion позволява да приложите няколко трансформатора паралелно върху едни и същи данни и да обедините резултатите хоризонтално. Докато Pipeline свързва стъпки последователно (изходът на една е входът на следващата), FeatureUnion ги изпълнява паралелно и залепя резултатите един до друг.
Комбиниране на PCA и SelectKBest
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_wine
from sklearn.model_selection import cross_val_score
X, y = load_wine(return_X_y=True)
# Комбиниране на два метода за извличане на характеристики
feature_union = FeatureUnion([
('pca', PCA(n_components=5)), # 5 главни компонента
('kbest', SelectKBest(f_classif, k=5)) # 5 най-значими характеристики
])
# Пълен пайплайн
pipe = Pipeline([
('scaler', StandardScaler()),
('features', feature_union), # 10 характеристики общо
('classifier', LogisticRegression(max_iter=1000))
])
scores = cross_val_score(pipe, X, y, cv=5)
print(f"Точност с FeatureUnion: {scores.mean():.4f} (+/- {scores.std():.4f})")
# Точност с FeatureUnion: 0.9831 (+/- 0.0188)
Комбиниране на текстови и числови характеристики
Ето един по-реалистичен пример — комбиниране на TF-IDF от текст с числови колони:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
import pandas as pd
# Примерни данни с текст и числа
data = pd.DataFrame({
'описание': [
'отличен продукт висока качество',
'лошо качество не препоръчвам',
'среден продукт нищо особено',
'превъзходно работи перфектно',
'ужасно счупи се след ден'
],
'цена': [100, 20, 50, 150, 15],
'рейтинг': [5, 1, 3, 5, 1],
'етикет': [1, 0, 0, 1, 0]
})
# Текстов пайплайн
текстов_пайплайн = Pipeline([
('tfidf', TfidfVectorizer(max_features=100))
])
# Числов пайплайн
числов_пайплайн = Pipeline([
('scaler', StandardScaler())
])
# Използване на ColumnTransformer вместо FeatureUnion
# за по-прецизен контрол над колоните
preprocessor = ColumnTransformer([
('text', текстов_пайплайн, 'описание'),
('num', числов_пайплайн, ['цена', 'рейтинг'])
])
# Пълен пайплайн
full_pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', LogisticRegression())
])
X = data.drop('етикет', axis=1)
y = data['етикет']
full_pipe.fit(X, y)
# Предсказване за нов запис
нов_запис = pd.DataFrame({
'описание': ['страхотен продукт много доволен'],
'цена': [80],
'рейтинг': [4]
})
print(f"Предсказание: {full_pipe.predict(нов_запис)}")
# Предсказание: [1]
Кога какво да ползвате:
ColumnTransformer— когато различни колони изискват различна предобработка (различни входове)FeatureUnion— когато искате да приложите няколко трансформации на едни и същи данни и да обедините резултатите (същият вход, различни изгледи)
Запазване и зареждане на пайплайни
Обучихте си пайплайна, получихте добри резултати — и какво сега? Ще искате да го запазите за продукция, за споделяне с колеги или просто за да не го обучавате отново всеки път. Scikit-learn предлага няколко подхода.
joblib — стандартният подход
import joblib
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
# Обучение на пайплайн
X, y = load_iris(return_X_y=True)
pipe = make_pipeline(StandardScaler(), RandomForestClassifier(random_state=42))
pipe.fit(X, y)
# === Запазване ===
joblib.dump(pipe, 'ml_pipeline.joblib')
print("Пайплайнът е запазен успешно.")
# === Зареждане ===
loaded_pipe = joblib.load('ml_pipeline.joblib')
# Проверка — резултатите трябва да са идентични
original_pred = pipe.predict(X[:5])
loaded_pred = loaded_pipe.predict(X[:5])
print(f"Предсказания съвпадат: {(original_pred == loaded_pred).all()}")
# Предсказания съвпадат: True
skops — по-безопасна сериализация
skops е библиотека от екипа на scikit-learn, която решава един важен проблем на joblib — сигурността. При joblib зареждането на файл от непроверен източник може да изпълни произволен код (подобно на pickle). Skops добавя допълнителен слой на проверка:
import skops.io as sio
# Запазване с skops
sio.dump(pipe, 'ml_pipeline.skops')
# Зареждане с проверка на типовете
# Трябва изрично да посочим кои типове са „доверени"
unknown_types = sio.get_untrusted_types(file='ml_pipeline.skops')
print(f"Неизвестни типове: {unknown_types}")
# Зареждане с одобрение на типовете
loaded_pipe = sio.load(
'ml_pipeline.skops',
trusted=unknown_types # Одобряваме само след проверка
)
# Предсказване с заредения пайплайн
predictions = loaded_pipe.predict(X[:3])
print(f"Предсказания: {predictions}")
Добри практики за версиониране
При запазване на пайплайни за продукция е критично да фиксирате версиите на библиотеките. Попадал съм на ситуации, в които модел обучен с една версия на scikit-learn е давал различни резултати с друга. Ето един прост подход:
import sklearn
import json
# Запазване на метаданни за средата
metadata = {
'sklearn_version': sklearn.__version__,
'python_version': '3.12', # Вашата версия на Python
'model_type': type(pipe.named_steps['randomforestclassifier']).__name__,
'training_date': '2026-02-09',
'features': ['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
'target_classes': ['setosa', 'versicolor', 'virginica']
}
# Запазване на метаданни до модела
with open('ml_pipeline_metadata.json', 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
print("Метаданни запазени.")
print(f"scikit-learn версия: {sklearn.__version__}")
Препоръка: Винаги запазвайте requirements.txt или pyproject.toml с точните версии на всички зависимости. Сериозно — това ще ви спести главоболия в бъдеще.
Добри практики и чести грешки
Вече разгледахме основите и напредналите техники. Нека обобщим най-важните добри практики и грешките, които виждам отново и отново в реални проекти.
Добри практики
1. Давайте описателни имена на стъпките
# ❌ Лошо — неясни имена
pipe = Pipeline([
('step1', StandardScaler()),
('step2', PCA(n_components=10)),
('step3', RandomForestClassifier())
])
# ✅ Добро — описателни имена
pipe = Pipeline([
('standardize_features', StandardScaler()),
('reduce_dimensions', PCA(n_components=10)),
('random_forest', RandomForestClassifier())
])
2. Използвайте кеширане за тежки трансформации
Ако някоя стъпка от пайплайна отнема много време (например сложна PCA на голям датасет), можете да кеширате резултата:
from tempfile import mkdtemp
from shutil import rmtree
# Създаване на временна директория за кеш
cache_dir = mkdtemp()
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=10)), # Тежка стъпка
('model', RandomForestClassifier())
], memory=cache_dir) # Кеширане на диска
# При повторно извикване на fit() с различен модел,
# но същите данни, PCA няма да се преизчислява
pipe.fit(X, y)
# Почистване на кеша когато приключите
rmtree(cache_dir)
3. Включете verbose режим при отстраняване на грешки
# Verbose пайплайн — показва времето за всяка стъпка
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('model', RandomForestClassifier())
], verbose=True)
pipe.fit(X, y)
# [Pipeline] .... (step 1 of 3) Processing scaler, total= 0.0s
# [Pipeline] .... (step 2 of 3) Processing pca, total= 0.1s
# [Pipeline] .... (step 3 of 3) Processing model, total= 2.3s
4. Използвайте set_output за pandas съвместимост
import sklearn
# Глобална настройка за pandas изход от всички трансформатори
sklearn.set_config(transform_output='pandas')
# Или за конкретен пайплайн
pipe.set_output(transform='pandas')
# Сега всички междинни стъпки запазват имената на колоните
# Това е особено полезно за дебъгване и интерпретируемост
5. Визуализация на пайплайна
В Jupyter notebook можете да получите интерактивна диаграма на пайплайна — доста полезно при сложни конфигурации:
from sklearn import set_config
# Активиране на HTML визуализация в Jupyter
set_config(display='diagram')
# Просто покажете обекта
pipe # В Jupyter ще се покаже интерактивна диаграма
# Или експортиране като HTML
from sklearn.utils import estimator_html_repr
html = estimator_html_repr(pipe)
with open('pipeline_diagram.html', 'w') as f:
f.write(html)
Чести грешки
Грешка 1: Обучение на трансформатор извън пайплайна
Класическа грешка — обучавате скалера преди разделянето на данните:
# ❌ ГРЕШНО — скалерът е обучен ПРЕДИ разделянето
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # Изтичане на данни!
X_train, X_test = train_test_split(X_scaled, test_size=0.2)
# ✅ ПРАВИЛНО — скалерът е вътре в пайплайна
pipe = make_pipeline(StandardScaler(), RandomForestClassifier())
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
pipe.fit(X_train, y_train) # Скалерът се обучава само на X_train
Грешка 2: Забравяне на handle_unknown при OneHotEncoder
# ❌ Ще гръмне ако тестовите данни съдържат нови категории
encoder = OneHotEncoder() # По подразбиране handle_unknown='error'
# ✅ Безопасно — непознатите категории се игнорират
encoder = OneHotEncoder(handle_unknown='ignore')
Грешка 3: fit_transform вместо transform при тестови данни
Тази грешка е коварна, защото кодът не хвърля грешка — просто дава грешни резултати:
# ❌ ГРЕШНО — fit_transform на тестовите данни
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.fit_transform(X_test) # Преобучава скалера!
# ✅ ПРАВИЛНО — само transform на тестовите данни
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test) # Използва параметрите от X_train
# ✅ НАЙ-ДОБРО — използвайте Pipeline и забравете за проблема
pipe = make_pipeline(StandardScaler(), LogisticRegression())
pipe.fit(X_train, y_train)
pipe.predict(X_test) # Автоматично правилно
Грешка 4: Неправилно именуване при GridSearch
# ❌ ГРЕШНО — единично подчертаване
param_grid = {'classifier_n_estimators': [100, 200]}
# ✅ ПРАВИЛНО — двойно подчертаване
param_grid = {'classifier__n_estimators': [100, 200]}
# За вложени пайплайни — верижно
# preprocessor -> num -> scaler -> with_mean
param_grid = {'preprocessor__num__scaler__with_mean': [True, False]}
Грешка 5: Загуба на имена на колони
# ❌ Губите имената на колоните
pipe = make_pipeline(StandardScaler(), PCA(n_components=5))
result = pipe.fit_transform(df) # numpy array без имена
# ✅ Запазвате имената на колоните
pipe = make_pipeline(StandardScaler(), PCA(n_components=5))
pipe.set_output(transform='pandas')
result = pipe.fit_transform(df) # pandas DataFrame с имена pca0, pca1...
Пълен шаблон за продукционен пайплайн
И накрая — ето готов шаблон, който можете да използвате като отправна точка за всеки ML проект:
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier
import joblib
import sklearn
# Активиране на pandas изход
sklearn.set_config(transform_output='pandas')
# === 1. Дефиниране на предобработката ===
числов_пайплайн = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
категориен_пайплайн = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='липсващ')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('num', числов_пайплайн,
make_column_selector(dtype_include=np.number)),
('cat', категориен_пайплайн,
make_column_selector(dtype_include=['object', 'category']))
])
# === 2. Пълен пайплайн ===
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(random_state=42))
], verbose=True)
# === 3. Кръстосана валидация ===
# scores = cross_val_score(pipeline, X, y, cv=5, scoring='accuracy')
# === 4. Оптимизация на хиперпараметри ===
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'classifier__n_estimators': [100, 200, 300],
'classifier__max_depth': [3, 5, 7],
'classifier__learning_rate': [0.05, 0.1, 0.2]
}
# grid_search = GridSearchCV(
# pipeline, param_grid, cv=5,
# scoring='accuracy', n_jobs=-1
# )
# grid_search.fit(X_train, y_train)
# === 5. Запазване ===
# joblib.dump(grid_search.best_estimator_, 'production_pipeline.joblib')
Заключение
Scikit-learn Pipeline е един от онези инструменти, които след като веднъж започнете да ползвате, се чудите как сте живели без тях. В тази статия разгледахме всичко, от което имате нужда:
- Основи на Pipeline — как да свързваме трансформатори и естиматори в единна верига
- ColumnTransformer — как да обработваме различни типове данни в рамките на един пайплайн
- Кръстосана валидация — как пайплайнът предотвратява изтичането на данни
- Оптимизация на хиперпараметри — как GridSearchCV и RandomizedSearchCV работят безпроблемно с пайплайни
- Потребителски трансформатори — как да създаваме собствени стъпки
- FeatureUnion — как да комбинираме множество трансформации паралелно
- Сериализация — как безопасно да запазваме пайплайни с joblib и skops
- Добри практики — именуване, кеширане, визуализация, set_output API
Кога да използвате Pipeline:
- Винаги, когато имате повече от една стъпка на предобработка
- При кръстосана валидация — задължително
- При търсене на оптимални хиперпараметри
- При подготовка на модел за продукция
- Когато искате кодът ви да е чист и възпроизводим
Честно казано, след като свикнете с пайплайните, няма да искате да се връщате към стария начин на писане на ML код. Те превръщат хаотичния процес на предобработка и обучение в подредена, елегантна система. А най-хубавото е, че предпазват от коварни бъгове като изтичане на данни — нещо, което може сериозно да подведе оценката на модела ви.
За повече информация и пълна справка на API-то, разгледайте официалната документация на scikit-learn за Pipeline и ColumnTransformer.
Започнете с прост пайплайн от две стъпки и постепенно добавяйте сложност, когато ви потрябва. Пайплайните не са просто „хубава практика" — те са фундаментален инструмент за сериозна работа с машинно обучение в Python.