راهنمای کامل Pipeline در Scikit-learn: از پیش‌پردازش تا استقرار مدل

آموزش جامع و عملی Pipeline در scikit-learn: از ColumnTransformer و ساخت Transformer سفارشی تا تنظیم هایپرپارامتر با GridSearchCV و استقرار مدل. با مثال‌های کد کامل و قابل اجرا.

مقدمه: چرا Pipeline در یادگیری ماشین اهمیت دارد؟

اگه تا حالا با پروژه‌های یادگیری ماشین کار کرده باشید، احتمالاً با این دردسرها آشنا هستید: کدهای پیش‌پردازش پراکنده‌ای که هر جای پروژه ریخته شدن، نشت داده (Data Leakage) بین مجموعه آموزش و آزمون، و البته مشکل همیشگی بازتولید نتایج. صادقانه بگم، تا قبل از اینکه خودم Pipeline رو جدی بگیرم، کلی وقتم صرف دیباگ کردن همین مسائل می‌شد.

اینجاست که Pipeline در scikit-learn وارد میدان می‌شه و همه چیز رو مرتب می‌کنه.

تصور کنید یک خط تولید کارخانه‌ای دارید. مواد اولیه از یک طرف وارد می‌شن، از مراحل مختلف پردازش رد می‌شن و محصول نهایی از طرف دیگه بیرون میاد. Pipeline در scikit-learn دقیقاً همین کار رو با داده‌های شما انجام می‌ده — داده‌های خام وارد می‌شن، مراحل پیش‌پردازش، مهندسی ویژگی و مدل‌سازی رو طی می‌کنن و در نهایت پیش‌بینی خروجی داده می‌شه.

مشکلاتی که Pipeline حل می‌کنه

بیایید ببینیم بدون Pipeline چه بلاهایی سرمون میاد:

  • نشت داده (Data Leakage): یکی از رایج‌ترین اشتباهات در یادگیری ماشین اینه که مثلاً StandardScaler رو روی کل دیتاست fit کنید و بعد تقسیم به train و test انجام بدید. این باعث می‌شه اطلاعات مجموعه تست به مرحله آموزش نشت کنه و عملاً نتایج ارزیابی‌تون قابل اتکا نباشه.
  • کد نامرتب و غیرقابل نگهداری: وقتی مراحل پیش‌پردازش زیاد بشن، کد شلوغ و پراکنده می‌شه. Pipeline همه چیز رو در یک شیء مرتب نگه می‌داره.
  • مشکل بازتولید (Reproducibility): بدون Pipeline، ترتیب اجرای مراحل ممکنه فراموش بشه یا عوض بشه. هممون این تجربه رو داشتیم، نه؟
  • مشکل استقرار (Deployment): وقتی می‌خواید مدل رو به محیط تولید ببرید، باید همه مراحل پیش‌پردازش رو هم همراهش ببرید. Pipeline همه رو یکجا بسته‌بندی می‌کنه.

Pipeline چیست؟ مفهوم پایه

sklearn.pipeline.Pipeline یک ابزار در scikit-learn هست که بهتون اجازه می‌ده چندین مرحله پردازش داده و مدل‌سازی رو به صورت زنجیره‌ای به هم وصل کنید. هر مرحله (step) در Pipeline شامل یک نام و یک تبدیل‌کننده (transformer) یا تخمین‌زننده (estimator) هست.

یک Pipeline از دو نوع مرحله تشکیل شده:

  • Transformer: شیئی که متدهای fit() و transform() داره (مثل StandardScaler، OneHotEncoder)
  • Estimator: شیئی که متدهای fit() و predict() داره (مثل LogisticRegression، RandomForestClassifier)

یه نکته مهم: همه مراحل Pipeline به جز مرحله آخر باید transformer باشن. مرحله آخر می‌تونه transformer یا estimator باشه.

بیایید با یک مثال ساده شروع کنیم:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

# ساخت یک Pipeline ساده
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

# حالا می‌تونید با یک خط کد fit و predict کنید
# pipe.fit(X_train, y_train)
# pipe.predict(X_test)

همچنین scikit-learn یک تابع کمکی به نام make_pipeline داره که نام‌گذاری مراحل رو خودکار انجام می‌ده. خیلی وقت‌ها از این تابع استفاده می‌کنم چون سریع‌تره:

from sklearn.pipeline import make_pipeline

# نام‌گذاری خودکار مراحل
pipe = make_pipeline(StandardScaler(), LogisticRegression())

# مراحل به صورت خودکار نام‌گذاری می‌شن:
# 'standardscaler' و 'logisticregression'
print(pipe.named_steps)

ساخت اولین Pipeline: قدم به قدم

خب، بیایید یک Pipeline واقعی بسازیم و با دیتاست Iris کار کنیم. این مثال کامل و قابل اجراست، پس می‌تونید مستقیم کپی‌اش کنید و اجرا کنید:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score, classification_report

# بارگذاری دیتاست
iris = load_iris()
X, y = iris.data, iris.target

# تقسیم داده به آموزش و آزمون
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# ساخت Pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),        # مرحله 1: نرمال‌سازی
    ('pca', PCA(n_components=2)),         # مرحله 2: کاهش ابعاد
    ('classifier', LogisticRegression()), # مرحله 3: طبقه‌بندی
])

# آموزش Pipeline
pipe.fit(X_train, y_train)

# پیش‌بینی
y_pred = pipe.predict(X_test)

# ارزیابی
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")

# می‌تونید مستقیماً score هم بگیرید
print(f"Score: {pipe.score(X_test, y_test):.4f}")

خروجی این کد چیزی شبیه این خواهد بود:

Accuracy: 0.9667

Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        10
           1       1.00      0.90      0.95        10
           2       0.91      1.00      0.95        10

    accuracy                           0.97        30
   macro avg       0.97      0.97      0.97        30
weighted avg       0.97      0.97      0.97        30

Score: 0.9667

دسترسی به مراحل Pipeline

بعد از آموزش Pipeline، می‌تونید به هر مرحله‌ش دسترسی داشته باشید. این قابلیت برای دیباگ کردن و فهمیدن اینکه هر مرحله چیکار کرده، فوق‌العاده‌ست:

# دسترسی به مراحل با نام
scaler = pipe.named_steps['scaler']
print(f"Mean values: {scaler.mean_}")

pca = pipe.named_steps['pca']
print(f"Explained variance ratio: {pca.explained_variance_ratio_}")

# دسترسی با ایندکس
first_step = pipe[0]  # StandardScaler
last_step = pipe[-1]  # LogisticRegression

# برش (slicing) Pipeline
preprocessing = pipe[:-1]  # فقط مراحل پیش‌پردازش
X_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")

ColumnTransformer برای داده‌های ناهمگن

در دنیای واقعی، دیتاست‌ها معمولاً شامل ستون‌های عددی و دسته‌ای هستن. اینجاست که ColumnTransformer به کمکمون میاد و بهمون اجازه می‌ده تبدیل‌های مختلف رو روی ستون‌های مختلف اعمال کنیم.

بیایید یک مثال واقع‌گرایانه ببینیم. فرض کنید دیتاستی داریم که شامل اطلاعات مشتریان هست و می‌خوایم پیش‌بینی کنیم آیا خرید می‌کنن یا نه:

import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# ساخت دیتاست نمونه
np.random.seed(42)
n_samples = 500

data = pd.DataFrame({
    'age': np.random.randint(18, 70, n_samples),
    'income': np.random.normal(50000, 15000, n_samples),
    'credit_score': np.random.randint(300, 850, n_samples),
    'education': np.random.choice(
        ['high_school', 'bachelor', 'master', 'phd'], n_samples
    ),
    'city': np.random.choice(
        ['Tehran', 'Isfahan', 'Shiraz', 'Tabriz', 'Mashhad'], n_samples
    ),
    'employment': np.random.choice(
        ['employed', 'self_employed', 'unemployed'], n_samples
    ),
    'purchased': np.random.randint(0, 2, n_samples)
})

# اضافه کردن مقادیر گمشده
data.loc[np.random.choice(n_samples, 30), 'income'] = np.nan
data.loc[np.random.choice(n_samples, 20), 'credit_score'] = np.nan
data.loc[np.random.choice(n_samples, 15), 'education'] = np.nan

print(data.head())
print(f"\nMissing values:\n{data.isnull().sum()}")

حالا بیایید Pipeline رو با ColumnTransformer بسازیم:

# تعریف ستون‌ها
numerical_features = ['age', 'income', 'credit_score']
categorical_features = ['education', 'city', 'employment']

# تعریف تبدیل‌های عددی
numerical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# تعریف تبدیل‌های دسته‌ای
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# ترکیب با ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

# Pipeline نهایی
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# تقسیم داده و آموزش
X = data.drop('purchased', axis=1)
y = data['purchased']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

full_pipeline.fit(X_train, y_train)
print(f"Accuracy: {full_pipeline.score(X_test, y_test):.4f}")

استفاده از make_column_selector

اگه حوصله مشخص کردن دستی ستون‌ها رو ندارید (که خب، بعضی وقتا واقعاً ستون‌ها زیاد می‌شن)، می‌تونید از make_column_selector استفاده کنید:

from sklearn.compose import make_column_selector

# انتخاب خودکار ستون‌ها بر اساس نوع داده
preprocessor_auto = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer,
         make_column_selector(dtype_include=np.number)),
        ('cat', categorical_transformer,
         make_column_selector(dtype_include=object))
    ]
)

auto_pipeline = Pipeline([
    ('preprocessor', preprocessor_auto),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

auto_pipeline.fit(X_train, y_train)
print(f"Auto Pipeline Accuracy: {auto_pipeline.score(X_test, y_test):.4f}")

استفاده از make_column_transformer

یک راه سریع‌تر هم وجود داره که من شخصاً توی پروتوتایپ‌ها ازش زیاد استفاده می‌کنم:

from sklearn.compose import make_column_transformer

# ساخت سریع ColumnTransformer
quick_preprocessor = make_column_transformer(
    (numerical_transformer, numerical_features),
    (categorical_transformer, categorical_features),
    remainder='drop'  # ستون‌های باقی‌مانده رو حذف کن
)

# remainder='passthrough' اگه بخواید ستون‌های باقی‌مانده بدون تغییر بمونن

ساخت Transformer سفارشی

خب، اینجا یکی از هیجان‌انگیزترین بخش‌های Pipeline رو داریم. گاهی اوقات Transformerهای آمادهٔ scikit-learn جوابگوی نیاز شما نیستن و باید خودتون دست به کار بشید. دو روش اصلی برای این کار وجود داره.

روش اول: استفاده از BaseEstimator و TransformerMixin

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class LogTransformer(BaseEstimator, TransformerMixin):
    """تبدیل لگاریتمی برای ویژگی‌های با توزیع چوله"""

    def __init__(self, offset=1):
        self.offset = offset

    def fit(self, X, y=None):
        # در اینجا نیازی به یادگیری چیزی نیست
        return self

    def transform(self, X):
        return np.log1p(X + self.offset)


class OutlierRemover(BaseEstimator, TransformerMixin):
    """حذف outlierها بر اساس IQR"""

    def __init__(self, factor=1.5):
        self.factor = factor

    def fit(self, X, y=None):
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        self.IQR_ = Q3 - Q1
        self.lower_ = Q1 - self.factor * self.IQR_
        self.upper_ = Q3 + self.factor * self.IQR_
        return self

    def transform(self, X):
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped


class FeatureInteraction(BaseEstimator, TransformerMixin):
    """ایجاد ویژگی‌های تعاملی (حاصل‌ضرب جفت ویژگی‌ها)"""

    def __init__(self, interaction_only=True):
        self.interaction_only = interaction_only

    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        return self

    def transform(self, X):
        X = np.array(X)
        interactions = []
        for i in range(self.n_features_in_):
            for j in range(i + 1, self.n_features_in_):
                interactions.append(X[:, i] * X[:, j])

        interaction_matrix = np.column_stack(interactions)

        if not self.interaction_only:
            return np.hstack([X, interaction_matrix])
        return interaction_matrix


# استفاده در Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline

X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

custom_pipe = Pipeline([
    ('outlier_remover', OutlierRemover(factor=2.0)),
    ('log_transform', LogTransformer(offset=0)),
    ('interactions', FeatureInteraction(interaction_only=False)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

custom_pipe.fit(X_train, y_train)
print(f"Custom Pipeline Accuracy: {custom_pipe.score(X_test, y_test):.4f}")

روش دوم: استفاده از FunctionTransformer

اگه تبدیل‌تون ساده‌ست و نمی‌خواید حتماً یک کلاس کامل بنویسید، FunctionTransformer گزینه خیلی خوبیه. سریعه و کار راه می‌ندازه:

from sklearn.preprocessing import FunctionTransformer
import numpy as np

# تبدیل لگاریتمی ساده
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1,
    validate=True
)

# تابع سفارشی
def add_squared_features(X):
    """اضافه کردن مربع ویژگی‌ها"""
    return np.hstack([X, X ** 2])

squared_transformer = FunctionTransformer(
    func=add_squared_features,
    validate=True
)

# تابع سفارشی با پارامتر
def clip_values(X, min_val=-3, max_val=3):
    return np.clip(X, min_val, max_val)

clip_transformer = FunctionTransformer(
    func=clip_values,
    kw_args={'min_val': -2, 'max_val': 2}
)

# استفاده در Pipeline
pipe_with_func = Pipeline([
    ('log', log_transformer),
    ('squared', squared_transformer),
    ('scaler', StandardScaler()),
    ('clip', clip_transformer),
    ('classifier', LogisticRegression(max_iter=1000))
])

pipe_with_func.fit(X_train, y_train)
print(f"FunctionTransformer Pipeline: {pipe_with_func.score(X_test, y_test):.4f}")

مهندسی ویژگی داخل Pipeline

یکی از بهترین کاربردهای Pipeline، انجام مهندسی ویژگی به صورت سازمان‌یافته هست. به نظر من، این بخش واقعاً جاییه که Pipeline ارزشش رو نشون می‌ده — چون مهندسی ویژگی معمولاً بخش آزمایشی‌تر کار هست و بدون Pipeline خیلی سریع شلوغ می‌شه.

PolynomialFeatures

from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split

# ساخت دیتاست رگرسیون
X, y = make_regression(
    n_samples=200, n_features=3, noise=10, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Pipeline با ویژگی‌های چندجمله‌ای
poly_pipe = Pipeline([
    ('poly', PolynomialFeatures(degree=2, include_bias=False)),
    ('scaler', StandardScaler()),
    ('regressor', Ridge(alpha=1.0))
])

poly_pipe.fit(X_train, y_train)
print(f"R² Score: {poly_pipe.score(X_test, y_test):.4f}")

# مشاهده تعداد ویژگی‌ها بعد از تبدیل
poly = poly_pipe.named_steps['poly']
print(f"Original features: 3")
print(f"Polynomial features: {poly.n_output_features_}")
print(f"Feature names: {poly.get_feature_names_out()}")

Pipeline کامل با چندین مرحله مهندسی ویژگی

بیایید یک Pipeline جامع‌تر بسازیم. توی این مثال، یک سناریوی نزدیک به واقعیت داریم — پیش‌بینی قیمت ملک با انواع مختلف داده:

import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import (
    StandardScaler, OneHotEncoder, OrdinalEncoder,
    PolynomialFeatures
)
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split

# ساخت دیتاست نمونه
np.random.seed(42)
n = 1000

df = pd.DataFrame({
    'area': np.random.uniform(50, 300, n),
    'rooms': np.random.randint(1, 6, n),
    'floor': np.random.randint(1, 15, n),
    'age': np.random.uniform(0, 50, n),
    'district': np.random.choice(
        ['north', 'south', 'east', 'west', 'center'], n
    ),
    'building_type': np.random.choice(
        ['apartment', 'villa', 'tower'], n
    ),
    'condition': np.random.choice(
        ['poor', 'fair', 'good', 'excellent'], n
    ),
})

# ایجاد متغیر هدف (قیمت بالا یا پایین)
df['expensive'] = (
    (df['area'] * 10 + df['rooms'] * 5000 +
     np.random.normal(0, 3000, n)) > 2000
).astype(int)

# اضافه کردن مقادیر گمشده
for col in ['area', 'rooms', 'age']:
    df.loc[np.random.choice(n, 50), col] = np.nan
df.loc[np.random.choice(n, 30), 'condition'] = np.nan

X = df.drop('expensive', axis=1)
y = df['expensive']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# تعریف ستون‌ها
num_cols = ['area', 'rooms', 'floor', 'age']
nominal_cols = ['district', 'building_type']
ordinal_cols = ['condition']

# Pipeline عددی با مهندسی ویژگی
numerical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('poly', PolynomialFeatures(degree=2, include_bias=False,
                                 interaction_only=True)),
    ('scaler', StandardScaler())
])

# Pipeline دسته‌ای اسمی
nominal_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# Pipeline دسته‌ای ترتیبی
ordinal_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OrdinalEncoder(
        categories=[['poor', 'fair', 'good', 'excellent']]
    ))
])

# ترکیب همه
preprocessor = ColumnTransformer([
    ('num', numerical_pipeline, num_cols),
    ('nom', nominal_pipeline, nominal_cols),
    ('ord', ordinal_pipeline, ordinal_cols)
])

# Pipeline نهایی با انتخاب ویژگی
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('feature_selection', SelectKBest(f_classif, k=10)),
    ('classifier', GradientBoostingClassifier(
        n_estimators=100, random_state=42
    ))
])

full_pipeline.fit(X_train, y_train)
print(f"Full Pipeline Accuracy: {full_pipeline.score(X_test, y_test):.4f}")

SimpleImputer: پر کردن مقادیر گمشده

مدیریت مقادیر گمشده یکی از مهم‌ترین مراحل پیش‌پردازش هست و تقریباً هیچ دیتاست واقعی‌ای بدون مقادیر گمشده پیدا نمی‌کنید. SimpleImputer چند استراتژی مختلف ارائه می‌ده:

from sklearn.impute import SimpleImputer

# استراتژی‌های مختلف
imputer_mean = SimpleImputer(strategy='mean')       # میانگین
imputer_median = SimpleImputer(strategy='median')   # میانه
imputer_mode = SimpleImputer(strategy='most_frequent')  # مد
imputer_const = SimpleImputer(strategy='constant', fill_value=0)  # مقدار ثابت

# در Pipeline
imputing_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
    ('model', LogisticRegression())
])

تنظیم هایپرپارامتر با Pipeline

خب، برسیم به بخشی که به نظرم جادوی واقعی Pipeline نمایان می‌شه! وقتی Pipeline دارید، می‌تونید هایپرپارامترهای همه مراحل رو یکجا با GridSearchCV یا RandomizedSearchCV تنظیم کنید. نکته کلیدی، استفاده از نشانه‌گذاری دو زیرخط (double underscore) هست.

نحوه نام‌گذاری پارامترها

فرمت نام پارامترها ساده‌ست:

# فرمت: step_name__parameter_name
# مثال‌ها:
# 'scaler__with_mean' -> پارامتر with_mean از مرحله scaler
# 'classifier__C' -> پارامتر C از مرحله classifier
# 'preprocessor__num__imputer__strategy' -> پارامتر strategy از imputer در num

مثال عملی با GridSearchCV

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_iris

# بارگذاری داده
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# ساخت Pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svm', SVC())
])

# تعریف فضای جستجوی پارامترها
param_grid = {
    'pca__n_components': [2, 3, 4],
    'svm__C': [0.1, 1, 10, 100],
    'svm__kernel': ['linear', 'rbf', 'poly'],
    'svm__gamma': ['scale', 'auto']
}

# جستجوی شبکه‌ای
grid_search = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)

grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
print(f"Test score: {grid_search.score(X_test, y_test):.4f}")

# دسترسی به بهترین Pipeline
best_pipe = grid_search.best_estimator_
print(f"\nBest Pipeline:\n{best_pipe}")

مثال با RandomizedSearchCV

وقتی فضای جستجو بزرگ باشه (و معمولاً هست!)، RandomizedSearchCV خیلی سریع‌تر و عملی‌تره. به جای امتحان کردن همه ترکیب‌ها، تعداد مشخصی ترکیب تصادفی رو بررسی می‌کنه:

from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import randint, uniform

# Pipeline پیچیده‌تر
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(random_state=42))
])

# توزیع‌های پارامتری
param_distributions = {
    'classifier__n_estimators': randint(50, 500),
    'classifier__max_depth': [None, 5, 10, 20, 30],
    'classifier__min_samples_split': randint(2, 20),
    'classifier__min_samples_leaf': randint(1, 10),
    'classifier__max_features': ['sqrt', 'log2', None],
}

random_search = RandomizedSearchCV(
    pipe,
    param_distributions,
    n_iter=50,       # تعداد ترکیبات تصادفی
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42,
    verbose=1
)

random_search.fit(X_train, y_train)

print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")
print(f"Test score: {random_search.score(X_test, y_test):.4f}")

تنظیم هایپرپارامتر Pipeline تو در تو

وقتی Pipeline شامل ColumnTransformer باشه، نام‌گذاری پارامترها یکم طولانی‌تر می‌شه. ولی نگران نباشید، منطقش همونه — فقط سطوح بیشتری از دو زیرخط دارید:

import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV, train_test_split

# ساخت دیتاست نمونه
np.random.seed(42)
n = 500
df = pd.DataFrame({
    'age': np.random.randint(18, 70, n).astype(float),
    'income': np.random.normal(50000, 15000, n),
    'city': np.random.choice(['A', 'B', 'C'], n),
    'target': np.random.randint(0, 2, n)
})
df.loc[np.random.choice(n, 30), 'age'] = np.nan

X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

num_cols = ['age', 'income']
cat_cols = ['city']

preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler())
    ]), num_cols),
    ('cat', OneHotEncoder(handle_unknown='ignore'), cat_cols)
])

pipe = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', GradientBoostingClassifier(random_state=42))
])

# پارامترهای تو در تو
param_grid = {
    # پارامتر strategy از imputer داخل num در preprocessor
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    # پارامترهای classifier
    'classifier__n_estimators': [50, 100, 200],
    'classifier__max_depth': [3, 5, 7],
    'classifier__learning_rate': [0.01, 0.1, 0.2],
}

grid = GridSearchCV(pipe, param_grid, cv=3, n_jobs=-1, verbose=1)
grid.fit(X_train, y_train)

print(f"Best params: {grid.best_params_}")
print(f"Best score: {grid.best_score_:.4f}")
print(f"Test score: {grid.score(X_test, y_test):.4f}")

یه نکته کاربردی: برای مشاهده همه پارامترهای قابل تنظیم در Pipeline، کافیه از pipe.get_params() استفاده کنید:

# دیدن همه پارامترهای قابل تنظیم
for param_name in sorted(pipe.get_params().keys()):
    print(param_name)

ذخیره و بارگذاری Pipeline

یکی از بزرگ‌ترین مزیت‌های Pipeline اینه که می‌تونید کل فرآیند — از پیش‌پردازش تا مدل — رو در یک فایل ذخیره کنید. بهترین ابزار برای این کار joblib هست که مخصوص آبجکت‌های پایتون با آرایه‌های NumPy بزرگ بهینه‌سازی شده.

ذخیره Pipeline

import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# ساخت و آموزش Pipeline
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)

# ذخیره Pipeline
joblib.dump(pipe, 'iris_pipeline.joblib')
print("Pipeline saved successfully!")

# همچنین می‌تونید با فشرده‌سازی ذخیره کنید
joblib.dump(pipe, 'iris_pipeline_compressed.joblib', compress=3)
print("Compressed pipeline saved!")

بارگذاری Pipeline

import joblib
import numpy as np

# بارگذاری Pipeline
loaded_pipe = joblib.load('iris_pipeline.joblib')

# پیش‌بینی با Pipeline بارگذاری شده
new_data = np.array([[5.1, 3.5, 1.4, 0.2]])
prediction = loaded_pipe.predict(new_data)
proba = loaded_pipe.predict_proba(new_data)

print(f"Prediction: {prediction[0]}")
print(f"Probabilities: {proba[0]}")
print(f"Score on test data: {loaded_pipe.score(X_test, y_test):.4f}")

استفاده در یک API ساده

بیایید ببینیم چطور می‌شه Pipeline ذخیره‌شده رو توی یک وب‌سرویس واقعی استفاده کرد. این مثال با Flask هست:

# app.py - نمونه API با Flask
import joblib
import numpy as np
from flask import Flask, request, jsonify

app = Flask(__name__)

# بارگذاری Pipeline در زمان شروع اپلیکیشن
pipeline = joblib.load('iris_pipeline.joblib')

@app.route('/predict', methods=['POST'])
def predict():
    data = request.json
    features = np.array(data['features']).reshape(1, -1)

    prediction = pipeline.predict(features)
    probability = pipeline.predict_proba(features)

    return jsonify({
        'prediction': int(prediction[0]),
        'probabilities': probability[0].tolist()
    })

if __name__ == '__main__':
    app.run(debug=True)

و نحوه فراخوانی‌ش:

# client.py - نمونه کلاینت
import requests

response = requests.post(
    'http://localhost:5000/predict',
    json={'features': [5.1, 3.5, 1.4, 0.2]}
)

print(response.json())
# خروجی: {'prediction': 0, 'probabilities': [0.97, 0.02, 0.01]}

مقایسه pickle و joblib

شاید براتون سوال پیش بیاد که چرا joblib و نه pickle؟ جواب کوتاه: هر دو کار می‌کنن، ولی joblib برای آرایه‌های بزرگ NumPy بهینه‌تره:

import pickle
import joblib
import os

# ذخیره با pickle
with open('pipe_pickle.pkl', 'wb') as f:
    pickle.dump(pipe, f)

# ذخیره با joblib
joblib.dump(pipe, 'pipe_joblib.joblib')

# مقایسه حجم فایل
pickle_size = os.path.getsize('pipe_pickle.pkl')
joblib_size = os.path.getsize('pipe_joblib.joblib')

print(f"Pickle size: {pickle_size:,} bytes")
print(f"Joblib size: {joblib_size:,} bytes")

ویژگی‌های جدید در scikit-learn نسخه‌های اخیر

scikit-learn مدام داره بهتر می‌شه و ویژگی‌های جدیدی اضافه می‌کنه. بیایید نگاهی به مهم‌ترین تغییرات مرتبط با Pipeline بندازیم.

پشتیبانی از Array API برای محاسبات GPU

از نسخه 1.6 به بعد، scikit-learn شروع به پشتیبانی از Array API کرده. به زبان ساده، این یعنی می‌تونید Pipeline رو روی GPU اجرا کنید! البته هنوز همه estimatorها پشتیبانی نمی‌شن، ولی داره گسترش پیدا می‌کنه.

# فعال‌سازی Array API (نیاز به PyTorch یا CuPy دارد)
from sklearn import config_context

# اگه PyTorch نصب باشه:
# import torch
# X_gpu = torch.tensor(X_train, device='cuda', dtype=torch.float32)

# استفاده از Pipeline با Array API
with config_context(array_api_dispatch=True):
    pipe = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression())
    ])
    # pipe.fit(X_gpu, y_train)  # اجرا روی GPU!

پارامتر transform_input در Pipeline

از نسخه 1.6، Pipeline یک پارامتر جدید به نام transform_input داره که برای Metadata Routing مفیده. این پارامتر بهتون اجازه می‌ده مشخص کنید کدوم ورودی‌ها باید از طریق مراحل قبلی تبدیل بشن:

# مثال مفهومی از transform_input
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

# در نسخه‌های جدید scikit-learn
pipe = Pipeline(
    steps=[
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression())
    ],
    # transform_input می‌تواند برای metadata routing استفاده شود
)

خروجی DataFrame با set_output

این یکی از ویژگی‌های مورد علاقه‌ من هست — از نسخه 1.2 اضافه شده و واقعاً دیباگ کردن Pipeline رو راحت‌تر کرده:

import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.datasets import fetch_openml

# تنظیم خروجی به DataFrame
pipe = Pipeline([
    ('scaler', StandardScaler()),
]).set_output(transform="pandas")

# یا به صورت global
from sklearn import set_config
set_config(transform_output="pandas")

# حالا همه transformer ها DataFrame برمی‌گردونن
scaler = StandardScaler()
X_df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
result = scaler.fit_transform(X_df)
print(type(result))  # 
print(result)

# برگشت به حالت پیش‌فرض
set_config(transform_output="default")

بهبودهای عملکردی

در نسخه‌های اخیر scikit-learn (نسخه 1.6 و بالاتر) بهبودهای عملکردی خوبی در Pipeline اعمال شده:

  • بهینه‌سازی حافظه در زنجیره‌های طولانی Pipeline با استفاده از پارامتر memory
  • پشتیبانی بهتر از sparse arrays در طول Pipeline
  • سازگاری بهتر با Metadata Routing جدید
  • پشتیبانی از __sklearn_tags__ برای بهبود شناسایی قابلیت‌های estimator ها

کش کردن مراحل Pipeline با memory

وقتی مراحل پیش‌پردازش زمان‌بر هستن (مثلاً یه PCA سنگین یا تبدیل متنی پیچیده)، می‌تونید از پارامتر memory استفاده کنید تا نتایج کش بشن. این توی GridSearchCV معجزه می‌کنه:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp

# ساخت Pipeline با کش
cachedir = mkdtemp()
cached_pipe = Pipeline(
    [
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=2)),
        ('svm', SVC())
    ],
    memory=cachedir  # مسیر کش
)

# حالا اگه Pipeline رو چند بار fit کنید (مثلاً در GridSearchCV)
# مراحل پیش‌پردازش فقط یک بار اجرا می‌شن
cached_pipe.fit(X_train, y_train)
print(f"Score: {cached_pipe.score(X_test, y_test):.4f}")

# پاکسازی کش
from shutil import rmtree
rmtree(cachedir)

بهترین شیوه‌ها و خطاهای رایج

حالا که با Pipeline آشنا شدید، وقتشه بریم سراغ نکات عملی. این بخش خلاصه تجربیاتیه که خودم و خیلی از توسعه‌دهنده‌ها با Pipeline داشتیم.

۱. جلوگیری از نشت داده

مهم‌ترین دلیل استفاده از Pipeline همینه. بیایید تفاوت روش اشتباه و درست رو ببینیم:

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris

X, y = load_iris(return_X_y=True)

# ❌ روش اشتباه - نشت داده!
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # fit روی کل داده!
wrong_scores = cross_val_score(
    LogisticRegression(), X_scaled, y, cv=5
)
print(f"Wrong way (data leakage): {wrong_scores.mean():.4f}")

# ✅ روش درست - با Pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])
correct_scores = cross_val_score(pipe, X, y, cv=5)
print(f"Correct way (no leakage): {correct_scores.mean():.4f}")

۲. نام‌گذاری مناسب مراحل

این نکته کوچیک ولی مهمه. نام‌گذاری خوب، دیباگ و نگهداری کد رو خیلی راحت‌تر می‌کنه:

# ❌ نام‌گذاری نامناسب
bad_pipe = Pipeline([
    ('step1', StandardScaler()),
    ('step2', PCA()),
    ('step3', LogisticRegression())
])

# ✅ نام‌گذاری مناسب و توصیفی
good_pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('dim_reduction', PCA()),
    ('classifier', LogisticRegression())
])

۳. دیباگ کردن Pipeline

یکی از سوالات رایج اینه: "چطور بفهمم هر مرحله Pipeline چیکار می‌کنه؟" خب، چند راه وجود داره:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris

X, y = load_iris(return_X_y=True)

# Pipeline با verbose
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', LogisticRegression())
], verbose=True)

pipe.fit(X, y)

# بررسی مراحل میانی
print("\n--- Debug Info ---")

# دیدن خروجی هر مرحله
X_after_scaler = pipe.named_steps['scaler'].transform(X)
print(f"After scaler - shape: {X_after_scaler.shape}, "
      f"mean: {X_after_scaler.mean():.6f}")

X_after_pca = pipe.named_steps['pca'].transform(X_after_scaler)
print(f"After PCA - shape: {X_after_pca.shape}")
print(f"Explained variance: {pipe.named_steps['pca'].explained_variance_ratio_}")

# یا با استفاده از slicing
preprocessing = pipe[:-1]  # همه مراحل به جز آخری
X_preprocessed = preprocessing.transform(X)
print(f"Preprocessed shape: {X_preprocessed.shape}")

۴. استفاده از set_output برای خروجی DataFrame

برای دیباگ بهتر و فهم بهتر داده‌ها بعد از هر تبدیل، خروجی رو به DataFrame تبدیل کنید:

import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer

# فعال‌سازی خروجی pandas
pipe = Pipeline([
    ('preprocessor', ColumnTransformer([
        ('num', StandardScaler(), ['age', 'income']),
        ('cat', OneHotEncoder(), ['city'])
    ]))
]).set_output(transform="pandas")

# حالا خروجی DataFrame خواهد بود
sample_data = pd.DataFrame({
    'age': [25, 30, 35],
    'income': [50000, 60000, 70000],
    'city': ['Tehran', 'Isfahan', 'Shiraz']
})

result = pipe.fit_transform(sample_data)
print(type(result))
print(result)
# خروجی یک DataFrame با نام ستون‌های مشخص خواهد بود

۵. رد کردن مراحل Pipeline با passthrough و drop

بعضی وقتا می‌خواید یک مرحله رو موقتاً غیرفعال کنید — مثلاً وقتی می‌خواید تأثیر PCA رو بسنجید:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression

# می‌تونید مراحل رو غیرفعال کنید
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', LogisticRegression())
])

# غیرفعال کردن PCA
pipe.set_params(pca='passthrough')
# حالا PCA اجرا نمی‌شه و داده بدون تغییر عبور می‌کنه

# این در GridSearchCV خیلی مفیده:
param_grid = {
    'pca': [PCA(n_components=2), PCA(n_components=3), 'passthrough'],
    'classifier__C': [0.1, 1, 10]
}

۶. FeatureUnion برای ترکیب ویژگی‌ها

وقتی می‌خواید چندین تبدیل مختلف رو روی یک داده اعمال کنید و نتایج رو کنار هم بگذارید، FeatureUnion کار رو راحت می‌کنه:

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris

X, y = load_iris(return_X_y=True)

# ترکیب چند تبدیل مختلف
combined_features = FeatureUnion([
    ('pca', PCA(n_components=2)),
    ('poly', PolynomialFeatures(degree=2, include_bias=False))
])

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('features', combined_features),
    ('classifier', LogisticRegression(max_iter=1000))
])

pipe.fit(X, y)
print(f"Score: {pipe.score(X, y):.4f}")

# دیدن تعداد ویژگی‌ها
preprocessed = pipe[:-1].transform(X)
print(f"Combined features shape: {preprocessed.shape}")

۷. مدیریت خطاها در Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.utils.validation import check_is_fitted

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

# بررسی آیا Pipeline آموزش دیده یا نه
try:
    check_is_fitted(pipe)
    print("Pipeline is fitted")
except Exception as e:
    print(f"Pipeline is NOT fitted: {e}")

# آموزش Pipeline
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
pipe.fit(X, y)

# حالا Pipeline آموزش دیده
check_is_fitted(pipe)
print("Pipeline is fitted successfully!")

۸. Pipeline در مقابل کد دستی: مقایسه عملی

بیایید یک مقایسه صادقانه ببینیم. خودتون قضاوت کنید کدوم رو ترجیح می‌دید:

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# ساخت دیتاست
np.random.seed(42)
n = 300
df = pd.DataFrame({
    'feature_1': np.random.normal(0, 1, n),
    'feature_2': np.random.uniform(0, 100, n),
    'category': np.random.choice(['A', 'B', 'C'], n),
    'target': np.random.randint(0, 2, n)
})
df.loc[np.random.choice(n, 20), 'feature_1'] = np.nan

X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# ======================================
# ❌ روش دستی (خطاپذیر و نامرتب)
# ======================================

# پیش‌پردازش عددی
num_imputer = SimpleImputer(strategy='mean')
X_train_num = X_train[['feature_1', 'feature_2']].copy()
X_test_num = X_test[['feature_1', 'feature_2']].copy()

X_train_num_imputed = num_imputer.fit_transform(X_train_num)
X_test_num_imputed = num_imputer.transform(X_test_num)

scaler = StandardScaler()
X_train_num_scaled = scaler.fit_transform(X_train_num_imputed)
X_test_num_scaled = scaler.transform(X_test_num_imputed)

# پیش‌پردازش دسته‌ای
encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
X_train_cat = encoder.fit_transform(X_train[['category']])
X_test_cat = encoder.transform(X_test[['category']])

# ترکیب
X_train_processed = np.hstack([X_train_num_scaled, X_train_cat])
X_test_processed = np.hstack([X_test_num_scaled, X_test_cat])

# مدل
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_processed, y_train)
manual_score = accuracy_score(y_test, model.predict(X_test_processed))

# ======================================
# ✅ روش Pipeline (تمیز و ایمن)
# ======================================

pipe = Pipeline([
    ('preprocessor', ColumnTransformer([
        ('num', Pipeline([
            ('imputer', SimpleImputer(strategy='mean')),
            ('scaler', StandardScaler())
        ]), ['feature_1', 'feature_2']),
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False),
         ['category'])
    ])),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

pipe.fit(X_train, y_train)
pipeline_score = pipe.score(X_test, y_test)

print(f"Manual approach score: {manual_score:.4f}")
print(f"Pipeline approach score: {pipeline_score:.4f}")
print(f"\nPipeline code is cleaner, safer, and easier to maintain!")

۹. مدیریت نسخه Pipeline

این نکته رو خیلی‌ها نادیده می‌گیرن ولی توی محیط تولید واقعاً حیاتیه. وقتی Pipeline رو ذخیره می‌کنید، حتماً اطلاعات نسخه scikit-learn رو هم کنارش ذخیره کنید:

import joblib
import json
from datetime import datetime
import sklearn

def save_pipeline_with_metadata(pipeline, filepath, metadata=None):
    """ذخیره Pipeline به همراه اطلاعات اضافی"""
    save_data = {
        'pipeline': pipeline,
        'metadata': {
            'sklearn_version': sklearn.__version__,
            'saved_at': datetime.now().isoformat(),
            'python_version': '3.11',
            **(metadata or {})
        }
    }
    joblib.dump(save_data, filepath)
    print(f"Pipeline saved to {filepath}")
    print(f"Metadata: {json.dumps(save_data['metadata'], indent=2)}")

def load_pipeline_with_metadata(filepath):
    """بارگذاری Pipeline به همراه بررسی نسخه"""
    save_data = joblib.load(filepath)
    metadata = save_data['metadata']

    current_version = sklearn.__version__
    saved_version = metadata['sklearn_version']

    if current_version != saved_version:
        print(f"WARNING: Pipeline was saved with sklearn {saved_version}, "
              f"but current version is {current_version}")

    print(f"Pipeline loaded. Saved at: {metadata['saved_at']}")
    return save_data['pipeline'], metadata

# استفاده
# save_pipeline_with_metadata(pipe, 'model_v1.joblib', {'accuracy': 0.95})
# loaded_pipe, meta = load_pipeline_with_metadata('model_v1.joblib')

۱۰. استفاده از Pipeline با Cross-Validation

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.datasets import load_iris

X, y = load_iris(return_X_y=True)

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('svm', SVC(kernel='rbf', C=1))
])

# اعتبارسنجی متقاطع
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(pipe, X, y, cv=cv, scoring='accuracy')

print(f"CV Scores: {scores}")
print(f"Mean: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")

مثال جامع: یک پروژه کامل از ابتدا تا انتها

خب، رسیدیم به بخش آخر و بهترین بخش! بیایید همه چیزی که تا اینجا یاد گرفتیم رو توی یک پروژه واقعی کنار هم بذاریم. قراره یک سیستم پیش‌بینی فروش بسازیم — از ساخت دیتاست تا استقرار:

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import (
    StandardScaler, OneHotEncoder, OrdinalEncoder,
    PolynomialFeatures
)
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectPercentile, f_classif
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import (
    train_test_split, RandomizedSearchCV, cross_val_score
)
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score
)
from sklearn.base import BaseEstimator, TransformerMixin
import joblib
from scipy.stats import randint, uniform

# === مرحله ۱: ساخت دیتاست ===
np.random.seed(42)
n = 2000

data = pd.DataFrame({
    'customer_age': np.random.randint(18, 75, n).astype(float),
    'annual_income': np.random.lognormal(10.5, 0.5, n),
    'num_purchases_last_year': np.random.poisson(5, n),
    'avg_purchase_amount': np.random.uniform(10, 500, n),
    'days_since_last_purchase': np.random.exponential(30, n),
    'website_visits': np.random.poisson(10, n),
    'membership_level': np.random.choice(
        ['bronze', 'silver', 'gold', 'platinum'], n,
        p=[0.4, 0.3, 0.2, 0.1]
    ),
    'preferred_category': np.random.choice(
        ['electronics', 'clothing', 'food', 'books', 'sports'], n
    ),
    'region': np.random.choice(
        ['north', 'south', 'east', 'west'], n
    ),
})

# ایجاد متغیر هدف (آیا خرید می‌کنه؟)
purchase_prob = (
    0.3
    + 0.002 * data['num_purchases_last_year']
    + 0.0000001 * data['annual_income']
    - 0.003 * data['days_since_last_purchase']
    + 0.001 * data['website_visits']
    + np.where(data['membership_level'] == 'platinum', 0.15, 0)
    + np.where(data['membership_level'] == 'gold', 0.1, 0)
)
purchase_prob = np.clip(purchase_prob, 0.05, 0.95)
data['will_purchase'] = np.random.binomial(1, purchase_prob)

# اضافه کردن مقادیر گمشده
for col in ['customer_age', 'annual_income', 'avg_purchase_amount']:
    mask = np.random.choice(n, int(n * 0.05), replace=False)
    data.loc[mask, col] = np.nan

print(f"Dataset shape: {data.shape}")
print(f"Target distribution:\n{data['will_purchase'].value_counts()}")
print(f"\nMissing values:\n{data.isnull().sum()}")

# === مرحله ۲: تقسیم داده ===
X = data.drop('will_purchase', axis=1)
y = data['will_purchase']
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# === مرحله ۳: تعریف Transformer سفارشی ===
class RatioFeatures(BaseEstimator, TransformerMixin):
    """ساخت ویژگی‌های نسبتی"""

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        # نسبت بازدید به خرید
        X['visit_purchase_ratio'] = (
            X['website_visits'] / (X['num_purchases_last_year'] + 1)
        )
        # میانگین هزینه نسبت به درآمد
        X['spending_ratio'] = (
            X['avg_purchase_amount'] / (X['annual_income'] + 1)
        )
        return X

# === مرحله ۴: ساخت Pipeline ===
# ستون‌ها
num_features = [
    'customer_age', 'annual_income', 'num_purchases_last_year',
    'avg_purchase_amount', 'days_since_last_purchase', 'website_visits'
]
ordinal_features = ['membership_level']
nominal_features = ['preferred_category', 'region']

# Pipeline عددی
numerical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# Pipeline ترتیبی
ordinal_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OrdinalEncoder(
        categories=[['bronze', 'silver', 'gold', 'platinum']]
    ))
])

# Pipeline اسمی
nominal_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# ColumnTransformer
preprocessor = ColumnTransformer([
    ('num', numerical_pipeline, num_features),
    ('ord', ordinal_pipeline, ordinal_features),
    ('nom', nominal_pipeline, nominal_features)
])

# Pipeline نهایی
full_pipeline = Pipeline([
    ('ratio_features', RatioFeatures()),
    ('preprocessor', preprocessor),
    ('feature_selection', SelectPercentile(f_classif, percentile=80)),
    ('classifier', GradientBoostingClassifier(random_state=42))
])

# === مرحله ۵: تنظیم هایپرپارامتر ===
param_distributions = {
    'feature_selection__percentile': [60, 70, 80, 90, 100],
    'classifier__n_estimators': randint(50, 300),
    'classifier__max_depth': [3, 5, 7, 10],
    'classifier__learning_rate': uniform(0.01, 0.3),
    'classifier__min_samples_split': randint(2, 20),
    'classifier__subsample': uniform(0.6, 0.4),
}

search = RandomizedSearchCV(
    full_pipeline,
    param_distributions,
    n_iter=30,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42,
    verbose=1
)

search.fit(X_train, y_train)

# === مرحله ۶: ارزیابی ===
print(f"\nBest CV Score: {search.best_score_:.4f}")
print(f"Best Parameters: {search.best_params_}")

best_pipe = search.best_estimator_
y_pred = best_pipe.predict(X_test)

print(f"\nTest Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
print(f"\nConfusion Matrix:\n{confusion_matrix(y_test, y_pred)}")

# === مرحله ۷: ذخیره Pipeline نهایی ===
joblib.dump(best_pipe, 'purchase_prediction_pipeline.joblib')
print("\nPipeline saved successfully!")

# === مرحله ۸: شبیه‌سازی استفاده در تولید ===
loaded_pipe = joblib.load('purchase_prediction_pipeline.joblib')

# پیش‌بینی برای یک مشتری جدید
new_customer = pd.DataFrame({
    'customer_age': [35],
    'annual_income': [75000],
    'num_purchases_last_year': [8],
    'avg_purchase_amount': [150],
    'days_since_last_purchase': [5],
    'website_visits': [15],
    'membership_level': ['gold'],
    'preferred_category': ['electronics'],
    'region': ['north']
})

prediction = loaded_pipe.predict(new_customer)
probability = loaded_pipe.predict_proba(new_customer)

print(f"\nNew customer prediction: {'Will purchase' if prediction[0] else 'Will not purchase'}")
print(f"Purchase probability: {probability[0][1]:.2%}")

نتیجه‌گیری و قدم‌های بعدی

تو این مقاله، سعی کردم هر چیزی که برای کار حرفه‌ای با Pipeline در scikit-learn نیاز دارید رو پوشش بدم. از مفاهیم پایه گرفته تا ساخت Transformer سفارشی، تنظیم هایپرپارامتر و استقرار در محیط تولید.

بیایید نکات کلیدی رو مرور کنیم:

  • Pipeline ها نشت داده رو جلوگیری می‌کنن: با قرار دادن مراحل پیش‌پردازش داخل Pipeline، مطمئن می‌شیم که اطلاعات مجموعه آزمون به مرحله آموزش نشت نمی‌کنه.
  • کد تمیزتر و قابل نگهداری‌تر: به جای ده‌ها خط کد پراکنده، همه چیز در یک شیء مرتب نگهداری می‌شه.
  • بازتولید‌پذیری: Pipeline ترتیب دقیق مراحل رو حفظ می‌کنه.
  • استقرار آسان: با joblib کل Pipeline رو ذخیره و در محیط تولید بارگذاری کنید.
  • تنظیم هایپرپارامتر یکپارچه: با نشانه‌گذاری دو زیرخط، هایپرپارامترهای همه مراحل رو همزمان تنظیم کنید.
  • ColumnTransformer: برای مدیریت داده‌های ناهمگن ضروریه.
  • Transformer سفارشی: با BaseEstimator و TransformerMixin هر تبدیلی رو می‌تونید داخل Pipeline بگذارید.

قدم‌های بعدی

حالا که Pipeline رو یاد گرفتید، پیشنهاد می‌کنم این موارد رو هم بررسی کنید:

  • scikit-learn Metadata Routing: سیستم جدید مسیریابی ابرداده که انعطاف‌پذیری بیشتری به Pipeline اضافه می‌کنه.
  • ONNX: برای تبدیل Pipeline به فرمت ONNX و اجرا در زبان‌ها و محیط‌های مختلف.
  • MLflow یا DVC: برای مدیریت نسخه‌های Pipeline و آزمایش‌ها در پروژه‌های بزرگ‌تر.
  • Pipeline در ترکیب با GPU: استفاده از Array API برای سرعت بخشیدن به محاسبات سنگین.

یه توصیه آخر: هر زمان که پروژه یادگیری ماشین شروع می‌کنید، اول Pipeline رو طراحی کنید و بعد شروع به کدنویسی کنید. این عادت ساده، کیفیت کار شما رو به شکل محسوسی بالا می‌بره. موفق باشید!

درباره نویسنده Editorial Team

Our team of expert writers and editors.