בניית צינור ניקוי נתונים אוטומטי בפייתון עם pandas 3.0

מדריך מעשי לבניית צינור ניקוי נתונים אוטומטי בפייתון עם pandas 3.0. טיפול בערכים חסרים, ניקוי טקסט, זיהוי חריגים, הסרת כפילויות, הרכבה עם pipe() וולידציה עם pandera — כולל קוד מוכן להעתקה.

ניקוי נתונים אוטומטי pandas 3.0 — מדריך 2026

למה בכלל צריך צינור ניקוי נתונים אוטומטי?

בואו נודה באמת — כל מי שעבד עם נתונים אמיתיים מכיר את התחושה הזו. פותחים קובץ CSV חדש, מלאי תקוות, ואז מגלים שהנתונים מבולגנים לגמרי. רווחים מיותרים, תאריכים בפורמטים שונים, ערכים חסרים בכל מקום. סקרים בתעשייה מראים שמדעני נתונים מקדישים בין 60% ל-80% מזמנם לניקוי נתונים. שמונים אחוז! ובלי השלב הזה, כל מודל או ניתוח שתבנו — חסר ערך.

הבעיה מתגלה כשמנסים לעשות את זה ידנית. זה איטי, חשוף לטעויות, ולא ניתן לשחזור. וכשהנתונים מתעדכנים (וזה תמיד קורה) — צריך להתחיל הכל מחדש.

הפתרון? צינור ניקוי אוטומטי (Automated Data Cleaning Pipeline). מגדירים את שלבי הניקוי פעם אחת, ומריצים אותם באופן עקבי על כל אצווה חדשה של נתונים. פשוט ויעיל.

במדריך הזה נבנה יחד, שלב אחר שלב, צינור ניקוי נתונים מקצועי בפייתון עם pandas 3.0. נשתמש בתכונות החדשות כמו pipe(), Copy-on-Write, ו-pd.col() כדי לכתוב קוד נקי, מודולרי ויעיל.

הכנת סביבת העבודה

לפני שנצלול פנימה, נוודא שהכלים שלנו מותקנים ומעודכנים. נכון לאפריל 2026, pandas 3.0 כבר יצאה רשמית (ינואר 2026) ומביאה שינויים משמעותיים שנשתמש בהם לאורך כל המדריך.

pip install "pandas>=3.0" numpy pyarrow pandera regex

נוודא שהכל עובד כמו שצריך:

import pandas as pd
import numpy as np
import re

print(f"pandas: {pd.__version__}")
print(f"numpy: {np.__version__}")

# וידוא ש-Copy-on-Write פעיל (ברירת מחדל ב-pandas 3.0)
print(f"Copy-on-Write enabled: {pd.options.mode.copy_on_write}")

מה חדש ב-pandas 3.0 שרלוונטי לניקוי נתונים?

  • Copy-on-Write (CoW) כברירת מחדל — סוף סוף, אין יותר SettingWithCopyWarning. כל פעולת slice יוצרת "צפייה עצלנית" שמעתיקה נתונים רק כשמשנים אותם. זה מונע באגים שבעבר היו גורמים לכאבי ראש רציניים.
  • מחרוזות PyArrow — סוג str חדש מבוסס PyArrow מחליף את object. פעולות טקסט מהירות פי 5-10, וזה קריטי כשמנקים מחרוזות בדאטאסטים גדולים.
  • ביטוי pd.col() — מחליף פונקציות lambda בשרשור מתודות. הקוד נהיה קריא יותר ובטוח יותר.
  • ChainedAssignmentError — במקום אזהרה שקל להתעלם ממנה, pandas 3.0 זורקת שגיאה ממשית על השמה משורשרת. זה מאלץ אותנו לכתוב קוד נכון מההתחלה (וזה דבר טוב).

שלב 1: טעינת נתונים וסקירה ראשונית

כל צינור ניקוי מתחיל בטעינת הנתונים והבנה מה יש לנו. ניצור דאטאסט לדוגמה שמדמה בעיות אמיתיות — מהסוג שאני פוגש כמעט בכל פרויקט: ערכים חסרים, כפילויות, טקסט לא אחיד, וערכים חריגים.

import pandas as pd
import numpy as np

# יצירת דאטאסט עם בעיות אמיתיות
np.random.seed(42)

data = {
    "customer_name": [
        "יוסי כהן", "  שרה לוי  ", "YOSSI COHEN", "דוד מזרחי",
        "שרה לוי", "רחל אברהם", None, "יוסי כהן", "מיכל דוד",
        "אבי גולן", "  רחל  אברהם", "דוד מזרחי", "נועה בן-דוד",
        "יוסי כהן", "שרה לוי"
    ],
    "email": [
        "[email protected]", "[email protected]", "[email protected]",
        "[email protected]", "[email protected]", "[email protected]",
        "unknown", "[email protected]", "[email protected]",
        "[email protected]", "[email protected]", "[email protected]",
        "[email protected]", "invalid-email", "[email protected]"
    ],
    "age": [
        32, 28, 32, 45, 28, -5, 38, 32, 250, 41, 36, 45, 29, 32, 28
    ],
    "purchase_amount": [
        150.0, 230.5, 150.0, None, 230.5, 89.0, 420.0, 150.0,
        175.0, None, 89.0, 310.0, 55.0, 150.0, 99999.0
    ],
    "signup_date": [
        "2025-01-15", "2025/02/20", "15-01-2025", "2025-03-10",
        "2025-02-20", "2025-04-05", "2025-05-12", "2025-01-15",
        "2025-06-30", "not a date", "2025-04-05", "2025-03-10",
        "2025-07-18", "2025-01-15", "2025-02-20"
    ],
    "city": [
        "תל אביב", "ירושלים", "תל-אביב", "חיפה",
        "ירושלים", "באר שבע", "חיפה", "תל אביב",
        "ת\"א", "חיפה", "באר-שבע", "חיפה",
        "ירושלים", "תל אביב", "ירושלים"
    ]
}

df = pd.DataFrame(data)
print(f"גודל הדאטאסט: {df.shape}")
print(f"\nסוגי נתונים:\n{df.dtypes}")
print(f"\nערכים חסרים:\n{df.isnull().sum()}")
print(f"\nכפילויות: {df.duplicated().sum()} שורות")

כבר בשלב הזה אפשר לראות שיש לנו בעיות מכל הסוגים — רווחים מיותרים, אימיילים לא תקינים, גילאים בלתי אפשריים (מינוס 5? 250?!), תאריכים בפורמטים שונים, ושמות ערים לא אחידים. אם זה נשמע מוכר, כנראה שעבדתם עם נתונים אמיתיים.

שלב 2: טיפול בערכים חסרים — מעבר לבסיסי

טיפול בערכים חסרים זה הרבה יותר מסתם fillna() או dropna(). הגישה הנכונה תלויה בסוג הנתונים, בכמות הערכים החסרים, ובמה שאתם מתכוונים לעשות עם הנתונים אחר כך. אין פתרון אחד שמתאים לכולם.

ניתוח דפוסי חוסר

def analyze_missing(df: pd.DataFrame) -> pd.DataFrame:
    """ניתוח מפורט של ערכים חסרים בכל עמודה."""
    missing = pd.DataFrame({
        "missing_count": df.isnull().sum(),
        "missing_pct": (df.isnull().sum() / len(df) * 100).round(2),
        "dtype": df.dtypes
    })
    missing = missing[missing["missing_count"] > 0]
    return missing.sort_values("missing_pct", ascending=False)

print(analyze_missing(df))

אסטרטגיות מילוי לפי סוג נתונים

def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """טיפול חכם בערכים חסרים לפי סוג העמודה."""
    df = df.copy()

    # עמודות מספריות — מילוי בחציון (עמיד לערכים קיצוניים)
    numeric_cols = df.select_dtypes(include=["number"]).columns
    for col in numeric_cols:
        if df[col].isnull().any():
            median_val = df[col].median()
            df[col] = df[col].fillna(median_val)
            print(f"  [{col}] מולא ב-median: {median_val}")

    # עמודות טקסט — מילוי בערך הנפוץ ביותר או סימון
    text_cols = df.select_dtypes(include=["string", "object"]).columns
    for col in text_cols:
        if df[col].isnull().any():
            mode_val = df[col].mode()
            if len(mode_val) > 0:
                df[col] = df[col].fillna(mode_val[0])
                print(f"  [{col}] מולא ב-mode: {mode_val[0]}")
            else:
                df[col] = df[col].fillna("לא ידוע")
                print(f"  [{col}] מולא ב: לא ידוע")

    return df

df_clean = handle_missing_values(df)
print(f"\nערכים חסרים אחרי טיפול: {df_clean.isnull().sum().sum()}")

שימו לב שאנחנו משתמשים ב-חציון (median) ולא בממוצע לעמודות מספריות. למה? כי החציון עמיד לערכים קיצוניים. אם רוב הגילאים בין 20 ל-60 אבל יש ערך אחד של 250 (שזו כמובן שגיאה), הממוצע יעלה באופן מלאכותי — אבל החציון כמעט לא יזוז. טריק קטן שחוסך בעיות גדולות.

שלב 3: ניקוי טקסט ותקנון מחרוזות

אז, ניקוי מחרוזות. זה אחד השלבים שלוקחים הכי הרבה זמן בפרויקטים אמיתיים. החדשות הטובות? עם מחרוזות PyArrow של pandas 3.0, הוא גם הרבה יותר מהיר מבעבר.

ניקוי בסיסי: רווחים, רישיות, תווים מיוחדים

def clean_text_column(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """ניקוי עמודת טקסט: רווחים, רישיות, ותווים מיוחדים."""
    df = df.copy()
    # הסרת רווחים מיותרים מתחילה וסוף
    df[col] = df[col].str.strip()
    # החלפת רווחים כפולים ברווח בודד
    df[col] = df[col].str.replace(r"\s+", " ", regex=True)
    return df

# ניקוי שמות לקוחות
df_clean = clean_text_column(df_clean, "customer_name")
print(df_clean["customer_name"].unique())

תקנון אימיילים עם ביטויים רגולריים

import re

def validate_and_clean_emails(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """תקנון ואימות כתובות אימייל."""
    df = df.copy()

    # המרה לאותיות קטנות
    df[col] = df[col].str.lower().str.strip()

    # תבנית regex לאימייל תקין
    email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"

    # סימון אימיילים לא תקינים
    is_valid = df[col].str.match(email_pattern, na=False)
    invalid_count = (~is_valid).sum()

    if invalid_count > 0:
        print(f"  נמצאו {invalid_count} אימיילים לא תקינים:")
        print(f"  {df.loc[~is_valid, col].tolist()}")
        # החלפה ב-None לטיפול עתידי
        df.loc[~is_valid, col] = None

    return df

df_clean = validate_and_clean_emails(df_clean, "email")
print(f"\nאימיילים אחרי ניקוי:\n{df_clean['email'].value_counts()}")

תקנון שמות ערים

הנה בעיה קלאסית שכל מי שעובד עם נתונים בעברית מכיר — אותה עיר מופיעה בצורות שונות לגמרי. "תל אביב", "תל-אביב", "ת"א" — כולם אותו מקום, אבל המחשב לא יודע את זה. לפעמים זו הבעיה שהכי מעצבנת לפתור (ולרוב אין מנוס ממילון מיפוי ידני):

def standardize_city_names(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """תקנון שמות ערים לפורמט אחיד."""
    df = df.copy()

    # מילון מיפוי: כל הווריאציות -> שם סטנדרטי
    city_mapping = {
        "תל אביב": "תל אביב-יפו",
        "תל-אביב": "תל אביב-יפו",
        "ת\"א": "תל אביב-יפו",
        "ת'א": "תל אביב-יפו",
        "באר שבע": "באר שבע",
        "באר-שבע": "באר שבע",
        "ב\"ש": "באר שבע",
    }

    # ניקוי ראשוני
    df[col] = df[col].str.strip()

    # החלפה לפי מילון
    df[col] = df[col].replace(city_mapping)

    return df

df_clean = standardize_city_names(df_clean, "city")
print(f"ערים אחרי תקנון:\n{df_clean['city'].value_counts()}")

שלב 4: המרת סוגי נתונים ותקנון תאריכים

תאריכים שמגיעים ממקורות שונים תמיד נראים שונה: 2025-01-15, 15/01/2025, 2025/02/20. צריך להמיר את כולם לפורמט אחיד, ו-pandas 3.0 הופכת את זה לקצת יותר פשוט עם ברירת מחדל של datetime64[us] (במקום datetime64[ns]):

def standardize_dates(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """המרת תאריכים לפורמט אחיד, עם טיפול בפורמטים שונים."""
    df = df.copy()

    # ניסיון ראשון: המרה אוטומטית
    converted = pd.to_datetime(df[col], errors="coerce", dayfirst=False)

    # בדיקה כמה נכשלו
    failed = converted.isnull() & df[col].notnull()
    if failed.any():
        print(f"  {failed.sum()} ערכים לא הומרו בנסיון הראשון")
        # ניסיון שני עם dayfirst=True לפורמטים אירופאיים
        converted_retry = pd.to_datetime(
            df.loc[failed, col], errors="coerce", dayfirst=True
        )
        converted = converted.fillna(converted_retry)

    # בדיקה סופית
    still_failed = converted.isnull() & df[col].notnull()
    if still_failed.any():
        print(f"  {still_failed.sum()} ערכים נותרו ללא המרה:")
        print(f"  {df.loc[still_failed, col].tolist()}")

    df[col] = converted
    # ב-pandas 3.0, הסוג כבר datetime64[us]
    print(f"  סוג הנתונים: {df[col].dtype}")

    return df

df_clean = standardize_dates(df_clean, "signup_date")

שלב 5: זיהוי וטיפול בערכים חריגים (Outliers)

ערכים חריגים הם עניין מסובך. הם יכולים להיות טעויות הקלדה (גיל 250), שגיאות מדידה, או ערכים אמיתיים שפשוט קיצוניים. הגישה לטיפול בהם תלויה לחלוטין בהקשר — ואין כאן תשובה אחת נכונה.

שיטת IQR (טווח בין-רבעוני)

def detect_outliers_iqr(
    df: pd.DataFrame,
    col: str,
    factor: float = 1.5
) -> pd.Series:
    """זיהוי ערכים חריגים בשיטת IQR."""
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - factor * IQR
    upper = Q3 + factor * IQR

    outliers = (df[col] < lower) | (df[col] > upper)
    print(f"  [{col}] IQR: {IQR:.2f}, טווח: [{lower:.2f}, {upper:.2f}]")
    print(f"  נמצאו {outliers.sum()} ערכים חריגים")

    return outliers

שיטת Z-Score

def detect_outliers_zscore(
    df: pd.DataFrame,
    col: str,
    threshold: float = 3.0
) -> pd.Series:
    """זיהוי ערכים חריגים בשיטת Z-Score."""
    mean = df[col].mean()
    std = df[col].std()

    if std == 0:
        return pd.Series(False, index=df.index)

    z_scores = ((df[col] - mean) / std).abs()
    outliers = z_scores > threshold
    print(f"  [{col}] mean: {mean:.2f}, std: {std:.2f}")
    print(f"  נמצאו {outliers.sum()} ערכים חריגים (z > {threshold})")

    return outliers

טיפול בערכים חריגים עם כללים עסקיים

def handle_outliers(df: pd.DataFrame) -> pd.DataFrame:
    """טיפול בערכים חריגים בעמודות מספריות עם כללים עסקיים."""
    df = df.copy()

    # גיל: חייב להיות בין 0 ל-120
    invalid_age = (df["age"] < 0) | (df["age"] > 120)
    if invalid_age.any():
        print(f"  גילאים לא תקינים: {df.loc[invalid_age, 'age'].tolist()}")
        df.loc[invalid_age, "age"] = np.nan
        df["age"] = df["age"].fillna(df["age"].median())

    # סכום רכישה: זיהוי חריגים עם IQR
    outliers = detect_outliers_iqr(df, "purchase_amount")
    if outliers.any():
        # גזירה (capping) במקום מחיקה
        Q1 = df["purchase_amount"].quantile(0.25)
        Q3 = df["purchase_amount"].quantile(0.75)
        IQR = Q3 - Q1
        upper = Q3 + 1.5 * IQR
        df.loc[df["purchase_amount"] > upper, "purchase_amount"] = upper
        print(f"  ערכי רכישה גולמו ל-{upper:.2f}")

    return df

df_clean = handle_outliers(df_clean)

שימו לב להבדל בגישה כאן: לגבי גיל, השתמשנו בכללים עסקיים קשיחים (גיל שלילי הוא תמיד שגיאה, אין על מה לדון). לגבי סכום רכישה, בחרנו IQR עם גזירה — כי סכום גבוה לא בהכרח שגיאה, אולי יש לנו לקוח VIP שקונה בכמויות. הגישה חייבת להתאים להקשר.

שלב 6: הסרת כפילויות — כולל כפילויות "כמעט"

כפילויות מדויקות? קל. אבל מה עם שורות שהן כמעט זהות? "יוסי כהן" ו-"YOSSI COHEN" עם אותו אימייל — ברור שזה אותו אדם, אבל drop_duplicates רגיל לא יתפוס את זה.

הסרת כפילויות מדויקות

def remove_exact_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    """הסרת כפילויות מדויקות, שמירה על הרשומה האחרונה."""
    before = len(df)
    df = df.drop_duplicates(keep="last")
    after = len(df)
    print(f"  הוסרו {before - after} כפילויות מדויקות")
    return df.reset_index(drop=True)

זיהוי כפילויות על בסיס מפתח עסקי

def remove_business_duplicates(
    df: pd.DataFrame,
    key_cols: list[str]
) -> pd.DataFrame:
    """הסרת כפילויות על בסיס עמודות מפתח."""
    before = len(df)
    # סימון כפילויות על בסיס עמודות מפתח
    df = df.drop_duplicates(subset=key_cols, keep="last")
    after = len(df)
    print(f"  הוסרו {before - after} כפילויות (מפתח: {key_cols})")
    return df.reset_index(drop=True)

# הסרת כפילויות לפי אימייל — כי שני לקוחות עם אותו אימייל הם כנראה אותו אדם
df_clean = remove_business_duplicates(df_clean, ["email"])

שלב 7: הרכבת הצינור המלא עם pipe()

עכשיו מגיע החלק שאני הכי אוהב — שרשור כל השלבים יחד באמצעות pipe() של pandas. השיטה הזו מאפשרת לבנות צינור קריא ומודולרי, שבו כל פונקציה מקבלת DataFrame ומחזירה DataFrame. הלוגיקה פשוטה, הקוד אלגנטי:

def clean_text_columns(df: pd.DataFrame) -> pd.DataFrame:
    """ניקוי כל עמודות הטקסט בדאטאסט."""
    df = df.copy()
    text_cols = df.select_dtypes(include=["string", "object"]).columns
    for col in text_cols:
        df[col] = df[col].str.strip()
        df[col] = df[col].str.replace(r"\s+", " ", regex=True)
    return df

def validate_emails(df: pd.DataFrame) -> pd.DataFrame:
    """אימות ותקנון אימיילים."""
    df = df.copy()
    df["email"] = df["email"].str.lower().str.strip()
    pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    invalid = ~df["email"].str.match(pattern, na=False)
    df.loc[invalid, "email"] = None
    return df

def standardize_cities(df: pd.DataFrame) -> pd.DataFrame:
    """תקנון שמות ערים."""
    df = df.copy()
    mapping = {
        "תל אביב": "תל אביב-יפו",
        "תל-אביב": "תל אביב-יפו",
        "ת\"א": "תל אביב-יפו",
        "באר-שבע": "באר שבע",
    }
    df["city"] = df["city"].replace(mapping)
    return df

def fix_dates(df: pd.DataFrame) -> pd.DataFrame:
    """המרת תאריכים לפורמט אחיד."""
    df = df.copy()
    df["signup_date"] = pd.to_datetime(
        df["signup_date"], errors="coerce", dayfirst=False
    )
    return df

def fix_outliers(df: pd.DataFrame) -> pd.DataFrame:
    """טיפול בערכים חריגים."""
    df = df.copy()
    # כללים עסקיים לגיל
    df.loc[(df["age"] < 0) | (df["age"] > 120), "age"] = np.nan
    df["age"] = df["age"].fillna(df["age"].median())
    # גזירת סכומי רכישה
    Q3 = df["purchase_amount"].quantile(0.75)
    Q1 = df["purchase_amount"].quantile(0.25)
    upper = Q3 + 1.5 * (Q3 - Q1)
    df.loc[df["purchase_amount"] > upper, "purchase_amount"] = upper
    return df

def deduplicate(df: pd.DataFrame) -> pd.DataFrame:
    """הסרת כפילויות."""
    return df.drop_duplicates(subset=["email"], keep="last").reset_index(drop=True)

# הצינור המלא — שורה אחת קריאה ומודולרית!
df_final = (
    df
    .pipe(handle_missing_values)
    .pipe(clean_text_columns)
    .pipe(validate_emails)
    .pipe(standardize_cities)
    .pipe(fix_dates)
    .pipe(fix_outliers)
    .pipe(deduplicate)
)

print(f"\nלפני ניקוי: {len(df)} שורות")
print(f"אחרי ניקוי: {len(df_final)} שורות")
print(f"\nערכים חסרים: {df_final.isnull().sum().sum()}")
print(f"\nתוצאה:\n{df_final.head(10)}")

היופי של pipe() הוא שאפשר בקלות להוסיף או להסיר שלבים, לשנות את הסדר, או להחליף פונקציה אחת באחרת — בלי לפרק שום דבר. כל פונקציה אחראית על דבר אחד בלבד, וכל אחת ניתנת לבדיקה בנפרד. מניסיון, כשמגיע דאטאסט חדש עם בעיות שלא ציפיתם להן, פשוט מוסיפים עוד שלב לצינור.

שלב 8: שימוש ב-pd.col() לניקוי חכם יותר

אחת התכונות שהכי ריגשו אותי ב-pandas 3.0 היא pd.col(). זה ביטוי שמחליף פונקציות lambda בשרשור מתודות. וזה לא רק עניין קוסמטי — הקוד באמת בטוח יותר כי pd.col() לא לוכד משתנים לפי הפניה (reference):

# לפני pandas 3.0 — עם lambda
df_old_style = (
    df
    .assign(age_valid=lambda x: x["age"].between(0, 120))
    .query("age_valid == True")
)

# pandas 3.0 — עם pd.col()
df_new_style = (
    df
    .assign(age_valid=pd.col("age").between(0, 120))
    [pd.col("age_valid")]
)

# דוגמה נוספת: יצירת עמודה מנורמלת
df_with_norm = df.assign(
    purchase_normalized=(
        pd.col("purchase_amount") - pd.col("purchase_amount").mean()
    ) / pd.col("purchase_amount").std()
)

pd.col() תומך בכל פעולות ה-Series הרגילות — שיטות מחרוזות, חישובים מספריים, השוואות. זו פשוט דרך הרבה יותר קריאה לכתוב טרנספורמציות מורכבות בתוך שרשרת מתודות, ואחרי שמתרגלים קשה לחזור ל-lambda.

שלב 9: ולידציה אוטומטית עם pandera

אוקיי, אז ניקינו את הנתונים. אבל איך באמת בודקים שהם נקיים? כאן נכנסת ספריית pandera — כלי שמגדיר "חוזה נתונים" שהתוצאה חייבת לעמוד בו. תחשבו על זה כמו טסטים ליחידה, אבל לנתונים:

import pandera as pa

# הגדרת סכימה — "חוזה" שהנתונים הנקיים חייבים לעמוד בו
clean_data_schema = pa.DataFrameSchema({
    "customer_name": pa.Column(
        str,
        nullable=True,
        checks=[
            pa.Check.str_length(min_value=2, max_value=100),
        ]
    ),
    "email": pa.Column(
        str,
        nullable=True,
        checks=[
            pa.Check.str_matches(
                r"^[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}$"
            ),
        ]
    ),
    "age": pa.Column(
        float,
        checks=[
            pa.Check.in_range(0, 120),
        ]
    ),
    "purchase_amount": pa.Column(
        float,
        nullable=True,
        checks=[
            pa.Check.greater_than_or_equal_to(0),
        ]
    ),
    "signup_date": pa.Column(
        "datetime64[us]",
        nullable=True,
    ),
    "city": pa.Column(
        str,
        nullable=True,
    ),
})

# הרצת הולידציה
try:
    clean_data_schema.validate(df_final, lazy=True)
    print("הנתונים עברו ולידציה בהצלחה!")
except pa.errors.SchemaErrors as e:
    print(f"נמצאו {len(e.failure_cases)} בעיות:")
    print(e.failure_cases)

הפרמטר lazy=True כאן הוא קריטי. הוא אומר ל-pandera לאסוף את כל הבעיות ולא לעצור בראשונה. ככה מקבלים תמונה מלאה של מצב הנתונים בהרצה אחת, במקום לתקן שגיאה אחת ולגלות שיש עוד עשר.

שלב 10: יצירת מחלקת Pipeline מלאה

לסיום, בואו נעטוף הכל במחלקה שאפשר להשתמש בה שוב ושוב. זו הגרסה שאני ממליץ לקחת לפרויקטים אמיתיים:

from dataclasses import dataclass, field
from typing import Callable
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class CleaningStep:
    """שלב בודד בצינור הניקוי."""
    name: str
    func: Callable[[pd.DataFrame], pd.DataFrame]
    description: str = ""


class DataCleaningPipeline:
    """צינור ניקוי נתונים מודולרי וניתן להרחבה."""

    def __init__(self, name: str = "default"):
        self.name = name
        self.steps: list[CleaningStep] = []
        self.history: list[dict] = []

    def add_step(
        self,
        name: str,
        func: Callable,
        description: str = ""
    ) -> "DataCleaningPipeline":
        """הוספת שלב לצינור (תומך בשרשור)."""
        self.steps.append(CleaningStep(name, func, description))
        return self

    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        """הרצת כל שלבי הצינור."""
        logger.info(f"מתחיל צינור '{self.name}' — {len(df)} שורות")
        result = df.copy()

        for step in self.steps:
            rows_before = len(result)
            nulls_before = result.isnull().sum().sum()

            result = step.func(result)

            rows_after = len(result)
            nulls_after = result.isnull().sum().sum()

            log_entry = {
                "step": step.name,
                "rows_before": rows_before,
                "rows_after": rows_after,
                "rows_removed": rows_before - rows_after,
                "nulls_before": nulls_before,
                "nulls_after": nulls_after,
            }
            self.history.append(log_entry)

            logger.info(
                f"  [{step.name}] "
                f"{rows_before}→{rows_after} שורות, "
                f"{nulls_before}→{nulls_after} ערכים חסרים"
            )

        logger.info(f"צינור '{self.name}' הושלם — {len(result)} שורות")
        return result

    def report(self) -> pd.DataFrame:
        """דוח סיכום של כל השלבים שהורצו."""
        return pd.DataFrame(self.history)


# שימוש בצינור
pipeline = (
    DataCleaningPipeline("customer_data")
    .add_step("missing_values", handle_missing_values, "טיפול בערכים חסרים")
    .add_step("clean_text", clean_text_columns, "ניקוי טקסט")
    .add_step("validate_emails", validate_emails, "אימות אימיילים")
    .add_step("standardize_cities", standardize_cities, "תקנון ערים")
    .add_step("fix_dates", fix_dates, "תקנון תאריכים")
    .add_step("fix_outliers", fix_outliers, "טיפול בחריגים")
    .add_step("deduplicate", deduplicate, "הסרת כפילויות")
)

df_result = pipeline.run(df)

# הצגת דוח הביצוע
print("\n--- דוח ביצוע הצינור ---")
print(pipeline.report().to_string(index=False))

המחלקה הזו נותנת לנו כמה יתרונות שכדאי להכיר: לוג מפורט של מה שקרה בכל שלב, דוח סיכום שמראה כמה שורות הוסרו ואיפה, ו-API פשוט להוספת שלבים חדשים. אפשר לשמור את הצינור ולהריץ אותו על כל אצווה חדשה — ולדעת בדיוק מה השתנה.

טיפים מתקדמים לייצור (Production)

שמירת קונפיגורציה בקובץ חיצוני

טיפ חשוב מניסיון: אל תקמפלו כללי ניקוי בתוך הקוד. שמרו אותם בקובץ YAML חיצוני שקל לעדכן בלי לגעת בקוד עצמו:

# cleaning_config.yaml
pipeline:
  name: customer_data_v2
  steps:
    - name: age_validation
      column: age
      min_value: 0
      max_value: 120
      on_invalid: set_null_and_fill_median

    - name: email_validation
      column: email
      pattern: "^[a-z0-9._%+-]+@[a-z0-9.-]+\\.[a-z]{2,}$"
      on_invalid: set_null

    - name: city_standardization
      column: city
      mapping_file: city_mappings.json
import yaml

def load_pipeline_config(path: str) -> dict:
    """טעינת קונפיגורציה מקובץ YAML."""
    with open(path) as f:
        return yaml.safe_load(f)

config = load_pipeline_config("cleaning_config.yaml")
print(f"צינור: {config['pipeline']['name']}")
print(f"מספר שלבים: {len(config['pipeline']['steps'])}")

תיעוד אוטומטי של שינויים

def log_changes(
    df_before: pd.DataFrame,
    df_after: pd.DataFrame,
    step_name: str
) -> dict:
    """תיעוד השינויים שבוצעו בכל שלב."""
    return {
        "step": step_name,
        "rows_removed": len(df_before) - len(df_after),
        "nulls_filled": (
            df_before.isnull().sum().sum()
            - df_after.isnull().sum().sum()
        ),
        "columns_changed": [
            col for col in df_before.columns
            if col in df_after.columns
            and not df_before[col].equals(df_after[col])
        ],
    }

שאלות נפוצות

מה ההבדל בין dropna() לבין fillna() ומתי להשתמש בכל אחד?

dropna() מוחקת שורות או עמודות עם ערכים חסרים, ומתאימה כשאחוז החוסר קטן (פחות מ-5%) ואובדן השורות לא משמעותי. fillna() ממלאה ערכים חסרים בערך חלופי — ממוצע, חציון, מוד, או ערך קבוע — ומתאימה כשרוצים לשמר את גודל הדאטאסט. הכלל שעובד לי: אם העמודה חסרה ביותר מ-50%, שקלו למחוק אותה. פחות מ-5% חסרים? מילוי בחציון או מוד הוא בדרך כלל בטוח.

איך Copy-on-Write ב-pandas 3.0 משפיע על צינורות ניקוי נתונים?

Copy-on-Write מונע תופעת לוואי נפוצה ומעצבנת: שינוי בטעות של הנתונים המקוריים. לפני pandas 3.0, פעולות כמו df["col"][mask] = value יכלו לשנות את ה-DataFrame המקורי בלי שתשימו לב. עכשיו, pandas 3.0 זורקת ChainedAssignmentError ומכריחה להשתמש ב-.loc[]. התוצאה? קוד ניקוי בטוח ויעיל יותר, בלי הצורך בקריאות .copy() מיותרות.

האם כדאי להסיר ערכים חריגים או לגזור אותם?

תלוי. הסרה מתאימה כשהערך הוא בבירור שגיאה — גיל שלילי, מחיר אפס על מוצר. גזירה (capping) מתאימה כשהערך אולי אמיתי אבל קיצוני, כמו לקוח שקנה בכמות גדולה במיוחד. כלל אצבע: השתמשו בכללים עסקיים ברורים כשאפשר (טווח גיל 0-120), ובשיטות סטטיסטיות כמו IQR או Z-Score כשאין כלל ברור.

איך בודקים שהצינור עובד נכון לאורך זמן?

השתמשו ב-pandera להגדרת סכימה שהנתונים הנקיים חייבים לעמוד בה, והריצו את הולידציה בסוף כל הרצה של הצינור. בנוסף, תעדו את מספר השורות שנמחקו, הערכים שמולאו, והשינויים שבוצעו. שינוי חד באחת המטריקות האלה? זה בדרך כלל סימן לבעיה בנתונים הגולמיים או בלוגיקת הניקוי.

מה הביצועים של pandas 3.0 בניקוי טקסט לעומת גרסאות קודמות?

שיפור משמעותי — פי 5-10 בפעולות מחרוזות, בזכות המעבר ל-PyArrow כ-backend. במקום סוג object האיטי, מחרוזות נשמרות כ-str מבוסס Arrow עם ייצוג יעיל בזיכרון. בפרויקט עם מיליוני שורות ההבדל מורגש מאוד — פעולות כמו .str.strip(), .str.lower(), ו-regex שרצו דקות יכולות לרדת לשניות בודדות.

אודות הכותב Editorial Team

Our team of expert writers and editors.