مقدمه: چرا Pipeline در یادگیری ماشین اهمیت دارد؟
اگه تا حالا با پروژههای یادگیری ماشین کار کرده باشید، احتمالاً با این دردسرها آشنا هستید: کدهای پیشپردازش پراکندهای که هر جای پروژه ریخته شدن، نشت داده (Data Leakage) بین مجموعه آموزش و آزمون، و البته مشکل همیشگی بازتولید نتایج. صادقانه بگم، تا قبل از اینکه خودم Pipeline رو جدی بگیرم، کلی وقتم صرف دیباگ کردن همین مسائل میشد.
اینجاست که Pipeline در scikit-learn وارد میدان میشه و همه چیز رو مرتب میکنه.
تصور کنید یک خط تولید کارخانهای دارید. مواد اولیه از یک طرف وارد میشن، از مراحل مختلف پردازش رد میشن و محصول نهایی از طرف دیگه بیرون میاد. Pipeline در scikit-learn دقیقاً همین کار رو با دادههای شما انجام میده — دادههای خام وارد میشن، مراحل پیشپردازش، مهندسی ویژگی و مدلسازی رو طی میکنن و در نهایت پیشبینی خروجی داده میشه.
مشکلاتی که Pipeline حل میکنه
بیایید ببینیم بدون Pipeline چه بلاهایی سرمون میاد:
- نشت داده (Data Leakage): یکی از رایجترین اشتباهات در یادگیری ماشین اینه که مثلاً
StandardScalerرو روی کل دیتاست fit کنید و بعد تقسیم به train و test انجام بدید. این باعث میشه اطلاعات مجموعه تست به مرحله آموزش نشت کنه و عملاً نتایج ارزیابیتون قابل اتکا نباشه. - کد نامرتب و غیرقابل نگهداری: وقتی مراحل پیشپردازش زیاد بشن، کد شلوغ و پراکنده میشه. Pipeline همه چیز رو در یک شیء مرتب نگه میداره.
- مشکل بازتولید (Reproducibility): بدون Pipeline، ترتیب اجرای مراحل ممکنه فراموش بشه یا عوض بشه. هممون این تجربه رو داشتیم، نه؟
- مشکل استقرار (Deployment): وقتی میخواید مدل رو به محیط تولید ببرید، باید همه مراحل پیشپردازش رو هم همراهش ببرید. Pipeline همه رو یکجا بستهبندی میکنه.
Pipeline چیست؟ مفهوم پایه
sklearn.pipeline.Pipeline یک ابزار در scikit-learn هست که بهتون اجازه میده چندین مرحله پردازش داده و مدلسازی رو به صورت زنجیرهای به هم وصل کنید. هر مرحله (step) در Pipeline شامل یک نام و یک تبدیلکننده (transformer) یا تخمینزننده (estimator) هست.
یک Pipeline از دو نوع مرحله تشکیل شده:
- Transformer: شیئی که متدهای
fit()وtransform()داره (مثلStandardScaler،OneHotEncoder) - Estimator: شیئی که متدهای
fit()وpredict()داره (مثلLogisticRegression،RandomForestClassifier)
یه نکته مهم: همه مراحل Pipeline به جز مرحله آخر باید transformer باشن. مرحله آخر میتونه transformer یا estimator باشه.
بیایید با یک مثال ساده شروع کنیم:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
# ساخت یک Pipeline ساده
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
# حالا میتونید با یک خط کد fit و predict کنید
# pipe.fit(X_train, y_train)
# pipe.predict(X_test)
همچنین scikit-learn یک تابع کمکی به نام make_pipeline داره که نامگذاری مراحل رو خودکار انجام میده. خیلی وقتها از این تابع استفاده میکنم چون سریعتره:
from sklearn.pipeline import make_pipeline
# نامگذاری خودکار مراحل
pipe = make_pipeline(StandardScaler(), LogisticRegression())
# مراحل به صورت خودکار نامگذاری میشن:
# 'standardscaler' و 'logisticregression'
print(pipe.named_steps)
ساخت اولین Pipeline: قدم به قدم
خب، بیایید یک Pipeline واقعی بسازیم و با دیتاست Iris کار کنیم. این مثال کامل و قابل اجراست، پس میتونید مستقیم کپیاش کنید و اجرا کنید:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score, classification_report
# بارگذاری دیتاست
iris = load_iris()
X, y = iris.data, iris.target
# تقسیم داده به آموزش و آزمون
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# ساخت Pipeline
pipe = Pipeline([
('scaler', StandardScaler()), # مرحله 1: نرمالسازی
('pca', PCA(n_components=2)), # مرحله 2: کاهش ابعاد
('classifier', LogisticRegression()), # مرحله 3: طبقهبندی
])
# آموزش Pipeline
pipe.fit(X_train, y_train)
# پیشبینی
y_pred = pipe.predict(X_test)
# ارزیابی
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
# میتونید مستقیماً score هم بگیرید
print(f"Score: {pipe.score(X_test, y_test):.4f}")
خروجی این کد چیزی شبیه این خواهد بود:
Accuracy: 0.9667
Classification Report:
precision recall f1-score support
0 1.00 1.00 1.00 10
1 1.00 0.90 0.95 10
2 0.91 1.00 0.95 10
accuracy 0.97 30
macro avg 0.97 0.97 0.97 30
weighted avg 0.97 0.97 0.97 30
Score: 0.9667
دسترسی به مراحل Pipeline
بعد از آموزش Pipeline، میتونید به هر مرحلهش دسترسی داشته باشید. این قابلیت برای دیباگ کردن و فهمیدن اینکه هر مرحله چیکار کرده، فوقالعادهست:
# دسترسی به مراحل با نام
scaler = pipe.named_steps['scaler']
print(f"Mean values: {scaler.mean_}")
pca = pipe.named_steps['pca']
print(f"Explained variance ratio: {pca.explained_variance_ratio_}")
# دسترسی با ایندکس
first_step = pipe[0] # StandardScaler
last_step = pipe[-1] # LogisticRegression
# برش (slicing) Pipeline
preprocessing = pipe[:-1] # فقط مراحل پیشپردازش
X_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
ColumnTransformer برای دادههای ناهمگن
در دنیای واقعی، دیتاستها معمولاً شامل ستونهای عددی و دستهای هستن. اینجاست که ColumnTransformer به کمکمون میاد و بهمون اجازه میده تبدیلهای مختلف رو روی ستونهای مختلف اعمال کنیم.
بیایید یک مثال واقعگرایانه ببینیم. فرض کنید دیتاستی داریم که شامل اطلاعات مشتریان هست و میخوایم پیشبینی کنیم آیا خرید میکنن یا نه:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ساخت دیتاست نمونه
np.random.seed(42)
n_samples = 500
data = pd.DataFrame({
'age': np.random.randint(18, 70, n_samples),
'income': np.random.normal(50000, 15000, n_samples),
'credit_score': np.random.randint(300, 850, n_samples),
'education': np.random.choice(
['high_school', 'bachelor', 'master', 'phd'], n_samples
),
'city': np.random.choice(
['Tehran', 'Isfahan', 'Shiraz', 'Tabriz', 'Mashhad'], n_samples
),
'employment': np.random.choice(
['employed', 'self_employed', 'unemployed'], n_samples
),
'purchased': np.random.randint(0, 2, n_samples)
})
# اضافه کردن مقادیر گمشده
data.loc[np.random.choice(n_samples, 30), 'income'] = np.nan
data.loc[np.random.choice(n_samples, 20), 'credit_score'] = np.nan
data.loc[np.random.choice(n_samples, 15), 'education'] = np.nan
print(data.head())
print(f"\nMissing values:\n{data.isnull().sum()}")
حالا بیایید Pipeline رو با ColumnTransformer بسازیم:
# تعریف ستونها
numerical_features = ['age', 'income', 'credit_score']
categorical_features = ['education', 'city', 'employment']
# تعریف تبدیلهای عددی
numerical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# تعریف تبدیلهای دستهای
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# ترکیب با ColumnTransformer
preprocessor = ColumnTransformer(
transformers=[
('num', numerical_transformer, numerical_features),
('cat', categorical_transformer, categorical_features)
]
)
# Pipeline نهایی
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# تقسیم داده و آموزش
X = data.drop('purchased', axis=1)
y = data['purchased']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
full_pipeline.fit(X_train, y_train)
print(f"Accuracy: {full_pipeline.score(X_test, y_test):.4f}")
استفاده از make_column_selector
اگه حوصله مشخص کردن دستی ستونها رو ندارید (که خب، بعضی وقتا واقعاً ستونها زیاد میشن)، میتونید از make_column_selector استفاده کنید:
from sklearn.compose import make_column_selector
# انتخاب خودکار ستونها بر اساس نوع داده
preprocessor_auto = ColumnTransformer(
transformers=[
('num', numerical_transformer,
make_column_selector(dtype_include=np.number)),
('cat', categorical_transformer,
make_column_selector(dtype_include=object))
]
)
auto_pipeline = Pipeline([
('preprocessor', preprocessor_auto),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
auto_pipeline.fit(X_train, y_train)
print(f"Auto Pipeline Accuracy: {auto_pipeline.score(X_test, y_test):.4f}")
استفاده از make_column_transformer
یک راه سریعتر هم وجود داره که من شخصاً توی پروتوتایپها ازش زیاد استفاده میکنم:
from sklearn.compose import make_column_transformer
# ساخت سریع ColumnTransformer
quick_preprocessor = make_column_transformer(
(numerical_transformer, numerical_features),
(categorical_transformer, categorical_features),
remainder='drop' # ستونهای باقیمانده رو حذف کن
)
# remainder='passthrough' اگه بخواید ستونهای باقیمانده بدون تغییر بمونن
ساخت Transformer سفارشی
خب، اینجا یکی از هیجانانگیزترین بخشهای Pipeline رو داریم. گاهی اوقات Transformerهای آمادهٔ scikit-learn جوابگوی نیاز شما نیستن و باید خودتون دست به کار بشید. دو روش اصلی برای این کار وجود داره.
روش اول: استفاده از BaseEstimator و TransformerMixin
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class LogTransformer(BaseEstimator, TransformerMixin):
"""تبدیل لگاریتمی برای ویژگیهای با توزیع چوله"""
def __init__(self, offset=1):
self.offset = offset
def fit(self, X, y=None):
# در اینجا نیازی به یادگیری چیزی نیست
return self
def transform(self, X):
return np.log1p(X + self.offset)
class OutlierRemover(BaseEstimator, TransformerMixin):
"""حذف outlierها بر اساس IQR"""
def __init__(self, factor=1.5):
self.factor = factor
def fit(self, X, y=None):
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
self.IQR_ = Q3 - Q1
self.lower_ = Q1 - self.factor * self.IQR_
self.upper_ = Q3 + self.factor * self.IQR_
return self
def transform(self, X):
X_clipped = np.clip(X, self.lower_, self.upper_)
return X_clipped
class FeatureInteraction(BaseEstimator, TransformerMixin):
"""ایجاد ویژگیهای تعاملی (حاصلضرب جفت ویژگیها)"""
def __init__(self, interaction_only=True):
self.interaction_only = interaction_only
def fit(self, X, y=None):
self.n_features_in_ = X.shape[1]
return self
def transform(self, X):
X = np.array(X)
interactions = []
for i in range(self.n_features_in_):
for j in range(i + 1, self.n_features_in_):
interactions.append(X[:, i] * X[:, j])
interaction_matrix = np.column_stack(interactions)
if not self.interaction_only:
return np.hstack([X, interaction_matrix])
return interaction_matrix
# استفاده در Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
custom_pipe = Pipeline([
('outlier_remover', OutlierRemover(factor=2.0)),
('log_transform', LogTransformer(offset=0)),
('interactions', FeatureInteraction(interaction_only=False)),
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])
custom_pipe.fit(X_train, y_train)
print(f"Custom Pipeline Accuracy: {custom_pipe.score(X_test, y_test):.4f}")
روش دوم: استفاده از FunctionTransformer
اگه تبدیلتون سادهست و نمیخواید حتماً یک کلاس کامل بنویسید، FunctionTransformer گزینه خیلی خوبیه. سریعه و کار راه میندازه:
from sklearn.preprocessing import FunctionTransformer
import numpy as np
# تبدیل لگاریتمی ساده
log_transformer = FunctionTransformer(
func=np.log1p,
inverse_func=np.expm1,
validate=True
)
# تابع سفارشی
def add_squared_features(X):
"""اضافه کردن مربع ویژگیها"""
return np.hstack([X, X ** 2])
squared_transformer = FunctionTransformer(
func=add_squared_features,
validate=True
)
# تابع سفارشی با پارامتر
def clip_values(X, min_val=-3, max_val=3):
return np.clip(X, min_val, max_val)
clip_transformer = FunctionTransformer(
func=clip_values,
kw_args={'min_val': -2, 'max_val': 2}
)
# استفاده در Pipeline
pipe_with_func = Pipeline([
('log', log_transformer),
('squared', squared_transformer),
('scaler', StandardScaler()),
('clip', clip_transformer),
('classifier', LogisticRegression(max_iter=1000))
])
pipe_with_func.fit(X_train, y_train)
print(f"FunctionTransformer Pipeline: {pipe_with_func.score(X_test, y_test):.4f}")
مهندسی ویژگی داخل Pipeline
یکی از بهترین کاربردهای Pipeline، انجام مهندسی ویژگی به صورت سازمانیافته هست. به نظر من، این بخش واقعاً جاییه که Pipeline ارزشش رو نشون میده — چون مهندسی ویژگی معمولاً بخش آزمایشیتر کار هست و بدون Pipeline خیلی سریع شلوغ میشه.
PolynomialFeatures
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
# ساخت دیتاست رگرسیون
X, y = make_regression(
n_samples=200, n_features=3, noise=10, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Pipeline با ویژگیهای چندجملهای
poly_pipe = Pipeline([
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('scaler', StandardScaler()),
('regressor', Ridge(alpha=1.0))
])
poly_pipe.fit(X_train, y_train)
print(f"R² Score: {poly_pipe.score(X_test, y_test):.4f}")
# مشاهده تعداد ویژگیها بعد از تبدیل
poly = poly_pipe.named_steps['poly']
print(f"Original features: 3")
print(f"Polynomial features: {poly.n_output_features_}")
print(f"Feature names: {poly.get_feature_names_out()}")
Pipeline کامل با چندین مرحله مهندسی ویژگی
بیایید یک Pipeline جامعتر بسازیم. توی این مثال، یک سناریوی نزدیک به واقعیت داریم — پیشبینی قیمت ملک با انواع مختلف داده:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import (
StandardScaler, OneHotEncoder, OrdinalEncoder,
PolynomialFeatures
)
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
# ساخت دیتاست نمونه
np.random.seed(42)
n = 1000
df = pd.DataFrame({
'area': np.random.uniform(50, 300, n),
'rooms': np.random.randint(1, 6, n),
'floor': np.random.randint(1, 15, n),
'age': np.random.uniform(0, 50, n),
'district': np.random.choice(
['north', 'south', 'east', 'west', 'center'], n
),
'building_type': np.random.choice(
['apartment', 'villa', 'tower'], n
),
'condition': np.random.choice(
['poor', 'fair', 'good', 'excellent'], n
),
})
# ایجاد متغیر هدف (قیمت بالا یا پایین)
df['expensive'] = (
(df['area'] * 10 + df['rooms'] * 5000 +
np.random.normal(0, 3000, n)) > 2000
).astype(int)
# اضافه کردن مقادیر گمشده
for col in ['area', 'rooms', 'age']:
df.loc[np.random.choice(n, 50), col] = np.nan
df.loc[np.random.choice(n, 30), 'condition'] = np.nan
X = df.drop('expensive', axis=1)
y = df['expensive']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# تعریف ستونها
num_cols = ['area', 'rooms', 'floor', 'age']
nominal_cols = ['district', 'building_type']
ordinal_cols = ['condition']
# Pipeline عددی با مهندسی ویژگی
numerical_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('poly', PolynomialFeatures(degree=2, include_bias=False,
interaction_only=True)),
('scaler', StandardScaler())
])
# Pipeline دستهای اسمی
nominal_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Pipeline دستهای ترتیبی
ordinal_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OrdinalEncoder(
categories=[['poor', 'fair', 'good', 'excellent']]
))
])
# ترکیب همه
preprocessor = ColumnTransformer([
('num', numerical_pipeline, num_cols),
('nom', nominal_pipeline, nominal_cols),
('ord', ordinal_pipeline, ordinal_cols)
])
# Pipeline نهایی با انتخاب ویژگی
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('feature_selection', SelectKBest(f_classif, k=10)),
('classifier', GradientBoostingClassifier(
n_estimators=100, random_state=42
))
])
full_pipeline.fit(X_train, y_train)
print(f"Full Pipeline Accuracy: {full_pipeline.score(X_test, y_test):.4f}")
SimpleImputer: پر کردن مقادیر گمشده
مدیریت مقادیر گمشده یکی از مهمترین مراحل پیشپردازش هست و تقریباً هیچ دیتاست واقعیای بدون مقادیر گمشده پیدا نمیکنید. SimpleImputer چند استراتژی مختلف ارائه میده:
from sklearn.impute import SimpleImputer
# استراتژیهای مختلف
imputer_mean = SimpleImputer(strategy='mean') # میانگین
imputer_median = SimpleImputer(strategy='median') # میانه
imputer_mode = SimpleImputer(strategy='most_frequent') # مد
imputer_const = SimpleImputer(strategy='constant', fill_value=0) # مقدار ثابت
# در Pipeline
imputing_pipe = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('model', LogisticRegression())
])
تنظیم هایپرپارامتر با Pipeline
خب، برسیم به بخشی که به نظرم جادوی واقعی Pipeline نمایان میشه! وقتی Pipeline دارید، میتونید هایپرپارامترهای همه مراحل رو یکجا با GridSearchCV یا RandomizedSearchCV تنظیم کنید. نکته کلیدی، استفاده از نشانهگذاری دو زیرخط (double underscore) هست.
نحوه نامگذاری پارامترها
فرمت نام پارامترها سادهست:
# فرمت: step_name__parameter_name
# مثالها:
# 'scaler__with_mean' -> پارامتر with_mean از مرحله scaler
# 'classifier__C' -> پارامتر C از مرحله classifier
# 'preprocessor__num__imputer__strategy' -> پارامتر strategy از imputer در num
مثال عملی با GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_iris
# بارگذاری داده
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# ساخت Pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA()),
('svm', SVC())
])
# تعریف فضای جستجوی پارامترها
param_grid = {
'pca__n_components': [2, 3, 4],
'svm__C': [0.1, 1, 10, 100],
'svm__kernel': ['linear', 'rbf', 'poly'],
'svm__gamma': ['scale', 'auto']
}
# جستجوی شبکهای
grid_search = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
print(f"Test score: {grid_search.score(X_test, y_test):.4f}")
# دسترسی به بهترین Pipeline
best_pipe = grid_search.best_estimator_
print(f"\nBest Pipeline:\n{best_pipe}")
مثال با RandomizedSearchCV
وقتی فضای جستجو بزرگ باشه (و معمولاً هست!)، RandomizedSearchCV خیلی سریعتر و عملیتره. به جای امتحان کردن همه ترکیبها، تعداد مشخصی ترکیب تصادفی رو بررسی میکنه:
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import randint, uniform
# Pipeline پیچیدهتر
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
# توزیعهای پارامتری
param_distributions = {
'classifier__n_estimators': randint(50, 500),
'classifier__max_depth': [None, 5, 10, 20, 30],
'classifier__min_samples_split': randint(2, 20),
'classifier__min_samples_leaf': randint(1, 10),
'classifier__max_features': ['sqrt', 'log2', None],
}
random_search = RandomizedSearchCV(
pipe,
param_distributions,
n_iter=50, # تعداد ترکیبات تصادفی
cv=5,
scoring='accuracy',
n_jobs=-1,
random_state=42,
verbose=1
)
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")
print(f"Test score: {random_search.score(X_test, y_test):.4f}")
تنظیم هایپرپارامتر Pipeline تو در تو
وقتی Pipeline شامل ColumnTransformer باشه، نامگذاری پارامترها یکم طولانیتر میشه. ولی نگران نباشید، منطقش همونه — فقط سطوح بیشتری از دو زیرخط دارید:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV, train_test_split
# ساخت دیتاست نمونه
np.random.seed(42)
n = 500
df = pd.DataFrame({
'age': np.random.randint(18, 70, n).astype(float),
'income': np.random.normal(50000, 15000, n),
'city': np.random.choice(['A', 'B', 'C'], n),
'target': np.random.randint(0, 2, n)
})
df.loc[np.random.choice(n, 30), 'age'] = np.nan
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
num_cols = ['age', 'income']
cat_cols = ['city']
preprocessor = ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler())
]), num_cols),
('cat', OneHotEncoder(handle_unknown='ignore'), cat_cols)
])
pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(random_state=42))
])
# پارامترهای تو در تو
param_grid = {
# پارامتر strategy از imputer داخل num در preprocessor
'preprocessor__num__imputer__strategy': ['mean', 'median'],
# پارامترهای classifier
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [3, 5, 7],
'classifier__learning_rate': [0.01, 0.1, 0.2],
}
grid = GridSearchCV(pipe, param_grid, cv=3, n_jobs=-1, verbose=1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score: {grid.best_score_:.4f}")
print(f"Test score: {grid.score(X_test, y_test):.4f}")
یه نکته کاربردی: برای مشاهده همه پارامترهای قابل تنظیم در Pipeline، کافیه از pipe.get_params() استفاده کنید:
# دیدن همه پارامترهای قابل تنظیم
for param_name in sorted(pipe.get_params().keys()):
print(param_name)
ذخیره و بارگذاری Pipeline
یکی از بزرگترین مزیتهای Pipeline اینه که میتونید کل فرآیند — از پیشپردازش تا مدل — رو در یک فایل ذخیره کنید. بهترین ابزار برای این کار joblib هست که مخصوص آبجکتهای پایتون با آرایههای NumPy بزرگ بهینهسازی شده.
ذخیره Pipeline
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# ساخت و آموزش Pipeline
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
# ذخیره Pipeline
joblib.dump(pipe, 'iris_pipeline.joblib')
print("Pipeline saved successfully!")
# همچنین میتونید با فشردهسازی ذخیره کنید
joblib.dump(pipe, 'iris_pipeline_compressed.joblib', compress=3)
print("Compressed pipeline saved!")
بارگذاری Pipeline
import joblib
import numpy as np
# بارگذاری Pipeline
loaded_pipe = joblib.load('iris_pipeline.joblib')
# پیشبینی با Pipeline بارگذاری شده
new_data = np.array([[5.1, 3.5, 1.4, 0.2]])
prediction = loaded_pipe.predict(new_data)
proba = loaded_pipe.predict_proba(new_data)
print(f"Prediction: {prediction[0]}")
print(f"Probabilities: {proba[0]}")
print(f"Score on test data: {loaded_pipe.score(X_test, y_test):.4f}")
استفاده در یک API ساده
بیایید ببینیم چطور میشه Pipeline ذخیرهشده رو توی یک وبسرویس واقعی استفاده کرد. این مثال با Flask هست:
# app.py - نمونه API با Flask
import joblib
import numpy as np
from flask import Flask, request, jsonify
app = Flask(__name__)
# بارگذاری Pipeline در زمان شروع اپلیکیشن
pipeline = joblib.load('iris_pipeline.joblib')
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
features = np.array(data['features']).reshape(1, -1)
prediction = pipeline.predict(features)
probability = pipeline.predict_proba(features)
return jsonify({
'prediction': int(prediction[0]),
'probabilities': probability[0].tolist()
})
if __name__ == '__main__':
app.run(debug=True)
و نحوه فراخوانیش:
# client.py - نمونه کلاینت
import requests
response = requests.post(
'http://localhost:5000/predict',
json={'features': [5.1, 3.5, 1.4, 0.2]}
)
print(response.json())
# خروجی: {'prediction': 0, 'probabilities': [0.97, 0.02, 0.01]}
مقایسه pickle و joblib
شاید براتون سوال پیش بیاد که چرا joblib و نه pickle؟ جواب کوتاه: هر دو کار میکنن، ولی joblib برای آرایههای بزرگ NumPy بهینهتره:
import pickle
import joblib
import os
# ذخیره با pickle
with open('pipe_pickle.pkl', 'wb') as f:
pickle.dump(pipe, f)
# ذخیره با joblib
joblib.dump(pipe, 'pipe_joblib.joblib')
# مقایسه حجم فایل
pickle_size = os.path.getsize('pipe_pickle.pkl')
joblib_size = os.path.getsize('pipe_joblib.joblib')
print(f"Pickle size: {pickle_size:,} bytes")
print(f"Joblib size: {joblib_size:,} bytes")
ویژگیهای جدید در scikit-learn نسخههای اخیر
scikit-learn مدام داره بهتر میشه و ویژگیهای جدیدی اضافه میکنه. بیایید نگاهی به مهمترین تغییرات مرتبط با Pipeline بندازیم.
پشتیبانی از Array API برای محاسبات GPU
از نسخه 1.6 به بعد، scikit-learn شروع به پشتیبانی از Array API کرده. به زبان ساده، این یعنی میتونید Pipeline رو روی GPU اجرا کنید! البته هنوز همه estimatorها پشتیبانی نمیشن، ولی داره گسترش پیدا میکنه.
# فعالسازی Array API (نیاز به PyTorch یا CuPy دارد)
from sklearn import config_context
# اگه PyTorch نصب باشه:
# import torch
# X_gpu = torch.tensor(X_train, device='cuda', dtype=torch.float32)
# استفاده از Pipeline با Array API
with config_context(array_api_dispatch=True):
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
# pipe.fit(X_gpu, y_train) # اجرا روی GPU!
پارامتر transform_input در Pipeline
از نسخه 1.6، Pipeline یک پارامتر جدید به نام transform_input داره که برای Metadata Routing مفیده. این پارامتر بهتون اجازه میده مشخص کنید کدوم ورودیها باید از طریق مراحل قبلی تبدیل بشن:
# مثال مفهومی از transform_input
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
# در نسخههای جدید scikit-learn
pipe = Pipeline(
steps=[
('scaler', StandardScaler()),
('classifier', LogisticRegression())
],
# transform_input میتواند برای metadata routing استفاده شود
)
خروجی DataFrame با set_output
این یکی از ویژگیهای مورد علاقه من هست — از نسخه 1.2 اضافه شده و واقعاً دیباگ کردن Pipeline رو راحتتر کرده:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.datasets import fetch_openml
# تنظیم خروجی به DataFrame
pipe = Pipeline([
('scaler', StandardScaler()),
]).set_output(transform="pandas")
# یا به صورت global
from sklearn import set_config
set_config(transform_output="pandas")
# حالا همه transformer ها DataFrame برمیگردونن
scaler = StandardScaler()
X_df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
result = scaler.fit_transform(X_df)
print(type(result)) #
print(result)
# برگشت به حالت پیشفرض
set_config(transform_output="default")
بهبودهای عملکردی
در نسخههای اخیر scikit-learn (نسخه 1.6 و بالاتر) بهبودهای عملکردی خوبی در Pipeline اعمال شده:
- بهینهسازی حافظه در زنجیرههای طولانی Pipeline با استفاده از پارامتر
memory - پشتیبانی بهتر از
sparsearrays در طول Pipeline - سازگاری بهتر با Metadata Routing جدید
- پشتیبانی از
__sklearn_tags__برای بهبود شناسایی قابلیتهای estimator ها
کش کردن مراحل Pipeline با memory
وقتی مراحل پیشپردازش زمانبر هستن (مثلاً یه PCA سنگین یا تبدیل متنی پیچیده)، میتونید از پارامتر memory استفاده کنید تا نتایج کش بشن. این توی GridSearchCV معجزه میکنه:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
# ساخت Pipeline با کش
cachedir = mkdtemp()
cached_pipe = Pipeline(
[
('scaler', StandardScaler()),
('pca', PCA(n_components=2)),
('svm', SVC())
],
memory=cachedir # مسیر کش
)
# حالا اگه Pipeline رو چند بار fit کنید (مثلاً در GridSearchCV)
# مراحل پیشپردازش فقط یک بار اجرا میشن
cached_pipe.fit(X_train, y_train)
print(f"Score: {cached_pipe.score(X_test, y_test):.4f}")
# پاکسازی کش
from shutil import rmtree
rmtree(cachedir)
بهترین شیوهها و خطاهای رایج
حالا که با Pipeline آشنا شدید، وقتشه بریم سراغ نکات عملی. این بخش خلاصه تجربیاتیه که خودم و خیلی از توسعهدهندهها با Pipeline داشتیم.
۱. جلوگیری از نشت داده
مهمترین دلیل استفاده از Pipeline همینه. بیایید تفاوت روش اشتباه و درست رو ببینیم:
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
# ❌ روش اشتباه - نشت داده!
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # fit روی کل داده!
wrong_scores = cross_val_score(
LogisticRegression(), X_scaled, y, cv=5
)
print(f"Wrong way (data leakage): {wrong_scores.mean():.4f}")
# ✅ روش درست - با Pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
correct_scores = cross_val_score(pipe, X, y, cv=5)
print(f"Correct way (no leakage): {correct_scores.mean():.4f}")
۲. نامگذاری مناسب مراحل
این نکته کوچیک ولی مهمه. نامگذاری خوب، دیباگ و نگهداری کد رو خیلی راحتتر میکنه:
# ❌ نامگذاری نامناسب
bad_pipe = Pipeline([
('step1', StandardScaler()),
('step2', PCA()),
('step3', LogisticRegression())
])
# ✅ نامگذاری مناسب و توصیفی
good_pipe = Pipeline([
('scaler', StandardScaler()),
('dim_reduction', PCA()),
('classifier', LogisticRegression())
])
۳. دیباگ کردن Pipeline
یکی از سوالات رایج اینه: "چطور بفهمم هر مرحله Pipeline چیکار میکنه؟" خب، چند راه وجود داره:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
# Pipeline با verbose
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=2)),
('classifier', LogisticRegression())
], verbose=True)
pipe.fit(X, y)
# بررسی مراحل میانی
print("\n--- Debug Info ---")
# دیدن خروجی هر مرحله
X_after_scaler = pipe.named_steps['scaler'].transform(X)
print(f"After scaler - shape: {X_after_scaler.shape}, "
f"mean: {X_after_scaler.mean():.6f}")
X_after_pca = pipe.named_steps['pca'].transform(X_after_scaler)
print(f"After PCA - shape: {X_after_pca.shape}")
print(f"Explained variance: {pipe.named_steps['pca'].explained_variance_ratio_}")
# یا با استفاده از slicing
preprocessing = pipe[:-1] # همه مراحل به جز آخری
X_preprocessed = preprocessing.transform(X)
print(f"Preprocessed shape: {X_preprocessed.shape}")
۴. استفاده از set_output برای خروجی DataFrame
برای دیباگ بهتر و فهم بهتر دادهها بعد از هر تبدیل، خروجی رو به DataFrame تبدیل کنید:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
# فعالسازی خروجی pandas
pipe = Pipeline([
('preprocessor', ColumnTransformer([
('num', StandardScaler(), ['age', 'income']),
('cat', OneHotEncoder(), ['city'])
]))
]).set_output(transform="pandas")
# حالا خروجی DataFrame خواهد بود
sample_data = pd.DataFrame({
'age': [25, 30, 35],
'income': [50000, 60000, 70000],
'city': ['Tehran', 'Isfahan', 'Shiraz']
})
result = pipe.fit_transform(sample_data)
print(type(result))
print(result)
# خروجی یک DataFrame با نام ستونهای مشخص خواهد بود
۵. رد کردن مراحل Pipeline با passthrough و drop
بعضی وقتا میخواید یک مرحله رو موقتاً غیرفعال کنید — مثلاً وقتی میخواید تأثیر PCA رو بسنجید:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
# میتونید مراحل رو غیرفعال کنید
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=2)),
('classifier', LogisticRegression())
])
# غیرفعال کردن PCA
pipe.set_params(pca='passthrough')
# حالا PCA اجرا نمیشه و داده بدون تغییر عبور میکنه
# این در GridSearchCV خیلی مفیده:
param_grid = {
'pca': [PCA(n_components=2), PCA(n_components=3), 'passthrough'],
'classifier__C': [0.1, 1, 10]
}
۶. FeatureUnion برای ترکیب ویژگیها
وقتی میخواید چندین تبدیل مختلف رو روی یک داده اعمال کنید و نتایج رو کنار هم بگذارید، FeatureUnion کار رو راحت میکنه:
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
# ترکیب چند تبدیل مختلف
combined_features = FeatureUnion([
('pca', PCA(n_components=2)),
('poly', PolynomialFeatures(degree=2, include_bias=False))
])
pipe = Pipeline([
('scaler', StandardScaler()),
('features', combined_features),
('classifier', LogisticRegression(max_iter=1000))
])
pipe.fit(X, y)
print(f"Score: {pipe.score(X, y):.4f}")
# دیدن تعداد ویژگیها
preprocessed = pipe[:-1].transform(X)
print(f"Combined features shape: {preprocessed.shape}")
۷. مدیریت خطاها در Pipeline
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.utils.validation import check_is_fitted
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
# بررسی آیا Pipeline آموزش دیده یا نه
try:
check_is_fitted(pipe)
print("Pipeline is fitted")
except Exception as e:
print(f"Pipeline is NOT fitted: {e}")
# آموزش Pipeline
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
pipe.fit(X, y)
# حالا Pipeline آموزش دیده
check_is_fitted(pipe)
print("Pipeline is fitted successfully!")
۸. Pipeline در مقابل کد دستی: مقایسه عملی
بیایید یک مقایسه صادقانه ببینیم. خودتون قضاوت کنید کدوم رو ترجیح میدید:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# ساخت دیتاست
np.random.seed(42)
n = 300
df = pd.DataFrame({
'feature_1': np.random.normal(0, 1, n),
'feature_2': np.random.uniform(0, 100, n),
'category': np.random.choice(['A', 'B', 'C'], n),
'target': np.random.randint(0, 2, n)
})
df.loc[np.random.choice(n, 20), 'feature_1'] = np.nan
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# ======================================
# ❌ روش دستی (خطاپذیر و نامرتب)
# ======================================
# پیشپردازش عددی
num_imputer = SimpleImputer(strategy='mean')
X_train_num = X_train[['feature_1', 'feature_2']].copy()
X_test_num = X_test[['feature_1', 'feature_2']].copy()
X_train_num_imputed = num_imputer.fit_transform(X_train_num)
X_test_num_imputed = num_imputer.transform(X_test_num)
scaler = StandardScaler()
X_train_num_scaled = scaler.fit_transform(X_train_num_imputed)
X_test_num_scaled = scaler.transform(X_test_num_imputed)
# پیشپردازش دستهای
encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
X_train_cat = encoder.fit_transform(X_train[['category']])
X_test_cat = encoder.transform(X_test[['category']])
# ترکیب
X_train_processed = np.hstack([X_train_num_scaled, X_train_cat])
X_test_processed = np.hstack([X_test_num_scaled, X_test_cat])
# مدل
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train_processed, y_train)
manual_score = accuracy_score(y_test, model.predict(X_test_processed))
# ======================================
# ✅ روش Pipeline (تمیز و ایمن)
# ======================================
pipe = Pipeline([
('preprocessor', ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler())
]), ['feature_1', 'feature_2']),
('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False),
['category'])
])),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
pipeline_score = pipe.score(X_test, y_test)
print(f"Manual approach score: {manual_score:.4f}")
print(f"Pipeline approach score: {pipeline_score:.4f}")
print(f"\nPipeline code is cleaner, safer, and easier to maintain!")
۹. مدیریت نسخه Pipeline
این نکته رو خیلیها نادیده میگیرن ولی توی محیط تولید واقعاً حیاتیه. وقتی Pipeline رو ذخیره میکنید، حتماً اطلاعات نسخه scikit-learn رو هم کنارش ذخیره کنید:
import joblib
import json
from datetime import datetime
import sklearn
def save_pipeline_with_metadata(pipeline, filepath, metadata=None):
"""ذخیره Pipeline به همراه اطلاعات اضافی"""
save_data = {
'pipeline': pipeline,
'metadata': {
'sklearn_version': sklearn.__version__,
'saved_at': datetime.now().isoformat(),
'python_version': '3.11',
**(metadata or {})
}
}
joblib.dump(save_data, filepath)
print(f"Pipeline saved to {filepath}")
print(f"Metadata: {json.dumps(save_data['metadata'], indent=2)}")
def load_pipeline_with_metadata(filepath):
"""بارگذاری Pipeline به همراه بررسی نسخه"""
save_data = joblib.load(filepath)
metadata = save_data['metadata']
current_version = sklearn.__version__
saved_version = metadata['sklearn_version']
if current_version != saved_version:
print(f"WARNING: Pipeline was saved with sklearn {saved_version}, "
f"but current version is {current_version}")
print(f"Pipeline loaded. Saved at: {metadata['saved_at']}")
return save_data['pipeline'], metadata
# استفاده
# save_pipeline_with_metadata(pipe, 'model_v1.joblib', {'accuracy': 0.95})
# loaded_pipe, meta = load_pipeline_with_metadata('model_v1.joblib')
۱۰. استفاده از Pipeline با Cross-Validation
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
pipe = Pipeline([
('scaler', StandardScaler()),
('svm', SVC(kernel='rbf', C=1))
])
# اعتبارسنجی متقاطع
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(pipe, X, y, cv=cv, scoring='accuracy')
print(f"CV Scores: {scores}")
print(f"Mean: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
مثال جامع: یک پروژه کامل از ابتدا تا انتها
خب، رسیدیم به بخش آخر و بهترین بخش! بیایید همه چیزی که تا اینجا یاد گرفتیم رو توی یک پروژه واقعی کنار هم بذاریم. قراره یک سیستم پیشبینی فروش بسازیم — از ساخت دیتاست تا استقرار:
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import (
StandardScaler, OneHotEncoder, OrdinalEncoder,
PolynomialFeatures
)
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectPercentile, f_classif
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import (
train_test_split, RandomizedSearchCV, cross_val_score
)
from sklearn.metrics import (
classification_report, confusion_matrix, accuracy_score
)
from sklearn.base import BaseEstimator, TransformerMixin
import joblib
from scipy.stats import randint, uniform
# === مرحله ۱: ساخت دیتاست ===
np.random.seed(42)
n = 2000
data = pd.DataFrame({
'customer_age': np.random.randint(18, 75, n).astype(float),
'annual_income': np.random.lognormal(10.5, 0.5, n),
'num_purchases_last_year': np.random.poisson(5, n),
'avg_purchase_amount': np.random.uniform(10, 500, n),
'days_since_last_purchase': np.random.exponential(30, n),
'website_visits': np.random.poisson(10, n),
'membership_level': np.random.choice(
['bronze', 'silver', 'gold', 'platinum'], n,
p=[0.4, 0.3, 0.2, 0.1]
),
'preferred_category': np.random.choice(
['electronics', 'clothing', 'food', 'books', 'sports'], n
),
'region': np.random.choice(
['north', 'south', 'east', 'west'], n
),
})
# ایجاد متغیر هدف (آیا خرید میکنه؟)
purchase_prob = (
0.3
+ 0.002 * data['num_purchases_last_year']
+ 0.0000001 * data['annual_income']
- 0.003 * data['days_since_last_purchase']
+ 0.001 * data['website_visits']
+ np.where(data['membership_level'] == 'platinum', 0.15, 0)
+ np.where(data['membership_level'] == 'gold', 0.1, 0)
)
purchase_prob = np.clip(purchase_prob, 0.05, 0.95)
data['will_purchase'] = np.random.binomial(1, purchase_prob)
# اضافه کردن مقادیر گمشده
for col in ['customer_age', 'annual_income', 'avg_purchase_amount']:
mask = np.random.choice(n, int(n * 0.05), replace=False)
data.loc[mask, col] = np.nan
print(f"Dataset shape: {data.shape}")
print(f"Target distribution:\n{data['will_purchase'].value_counts()}")
print(f"\nMissing values:\n{data.isnull().sum()}")
# === مرحله ۲: تقسیم داده ===
X = data.drop('will_purchase', axis=1)
y = data['will_purchase']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# === مرحله ۳: تعریف Transformer سفارشی ===
class RatioFeatures(BaseEstimator, TransformerMixin):
"""ساخت ویژگیهای نسبتی"""
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
# نسبت بازدید به خرید
X['visit_purchase_ratio'] = (
X['website_visits'] / (X['num_purchases_last_year'] + 1)
)
# میانگین هزینه نسبت به درآمد
X['spending_ratio'] = (
X['avg_purchase_amount'] / (X['annual_income'] + 1)
)
return X
# === مرحله ۴: ساخت Pipeline ===
# ستونها
num_features = [
'customer_age', 'annual_income', 'num_purchases_last_year',
'avg_purchase_amount', 'days_since_last_purchase', 'website_visits'
]
ordinal_features = ['membership_level']
nominal_features = ['preferred_category', 'region']
# Pipeline عددی
numerical_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Pipeline ترتیبی
ordinal_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OrdinalEncoder(
categories=[['bronze', 'silver', 'gold', 'platinum']]
))
])
# Pipeline اسمی
nominal_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# ColumnTransformer
preprocessor = ColumnTransformer([
('num', numerical_pipeline, num_features),
('ord', ordinal_pipeline, ordinal_features),
('nom', nominal_pipeline, nominal_features)
])
# Pipeline نهایی
full_pipeline = Pipeline([
('ratio_features', RatioFeatures()),
('preprocessor', preprocessor),
('feature_selection', SelectPercentile(f_classif, percentile=80)),
('classifier', GradientBoostingClassifier(random_state=42))
])
# === مرحله ۵: تنظیم هایپرپارامتر ===
param_distributions = {
'feature_selection__percentile': [60, 70, 80, 90, 100],
'classifier__n_estimators': randint(50, 300),
'classifier__max_depth': [3, 5, 7, 10],
'classifier__learning_rate': uniform(0.01, 0.3),
'classifier__min_samples_split': randint(2, 20),
'classifier__subsample': uniform(0.6, 0.4),
}
search = RandomizedSearchCV(
full_pipeline,
param_distributions,
n_iter=30,
cv=5,
scoring='accuracy',
n_jobs=-1,
random_state=42,
verbose=1
)
search.fit(X_train, y_train)
# === مرحله ۶: ارزیابی ===
print(f"\nBest CV Score: {search.best_score_:.4f}")
print(f"Best Parameters: {search.best_params_}")
best_pipe = search.best_estimator_
y_pred = best_pipe.predict(X_test)
print(f"\nTest Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
print(f"\nConfusion Matrix:\n{confusion_matrix(y_test, y_pred)}")
# === مرحله ۷: ذخیره Pipeline نهایی ===
joblib.dump(best_pipe, 'purchase_prediction_pipeline.joblib')
print("\nPipeline saved successfully!")
# === مرحله ۸: شبیهسازی استفاده در تولید ===
loaded_pipe = joblib.load('purchase_prediction_pipeline.joblib')
# پیشبینی برای یک مشتری جدید
new_customer = pd.DataFrame({
'customer_age': [35],
'annual_income': [75000],
'num_purchases_last_year': [8],
'avg_purchase_amount': [150],
'days_since_last_purchase': [5],
'website_visits': [15],
'membership_level': ['gold'],
'preferred_category': ['electronics'],
'region': ['north']
})
prediction = loaded_pipe.predict(new_customer)
probability = loaded_pipe.predict_proba(new_customer)
print(f"\nNew customer prediction: {'Will purchase' if prediction[0] else 'Will not purchase'}")
print(f"Purchase probability: {probability[0][1]:.2%}")
نتیجهگیری و قدمهای بعدی
تو این مقاله، سعی کردم هر چیزی که برای کار حرفهای با Pipeline در scikit-learn نیاز دارید رو پوشش بدم. از مفاهیم پایه گرفته تا ساخت Transformer سفارشی، تنظیم هایپرپارامتر و استقرار در محیط تولید.
بیایید نکات کلیدی رو مرور کنیم:
- Pipeline ها نشت داده رو جلوگیری میکنن: با قرار دادن مراحل پیشپردازش داخل Pipeline، مطمئن میشیم که اطلاعات مجموعه آزمون به مرحله آموزش نشت نمیکنه.
- کد تمیزتر و قابل نگهداریتر: به جای دهها خط کد پراکنده، همه چیز در یک شیء مرتب نگهداری میشه.
- بازتولیدپذیری: Pipeline ترتیب دقیق مراحل رو حفظ میکنه.
- استقرار آسان: با
joblibکل Pipeline رو ذخیره و در محیط تولید بارگذاری کنید. - تنظیم هایپرپارامتر یکپارچه: با نشانهگذاری دو زیرخط، هایپرپارامترهای همه مراحل رو همزمان تنظیم کنید.
- ColumnTransformer: برای مدیریت دادههای ناهمگن ضروریه.
- Transformer سفارشی: با
BaseEstimatorوTransformerMixinهر تبدیلی رو میتونید داخل Pipeline بگذارید.
قدمهای بعدی
حالا که Pipeline رو یاد گرفتید، پیشنهاد میکنم این موارد رو هم بررسی کنید:
- scikit-learn Metadata Routing: سیستم جدید مسیریابی ابرداده که انعطافپذیری بیشتری به Pipeline اضافه میکنه.
- ONNX: برای تبدیل Pipeline به فرمت ONNX و اجرا در زبانها و محیطهای مختلف.
- MLflow یا DVC: برای مدیریت نسخههای Pipeline و آزمایشها در پروژههای بزرگتر.
- Pipeline در ترکیب با GPU: استفاده از Array API برای سرعت بخشیدن به محاسبات سنگین.
یه توصیه آخر: هر زمان که پروژه یادگیری ماشین شروع میکنید، اول Pipeline رو طراحی کنید و بعد شروع به کدنویسی کنید. این عادت ساده، کیفیت کار شما رو به شکل محسوسی بالا میبره. موفق باشید!