Автоматизоване очищення даних у Python: pandas 3.0, pyjanitor та pandera

Як побудувати автоматизований пайплайн очищення даних у Python за допомогою pandas 3.0 (PyArrow, Copy-on-Write, .pipe()), pyjanitor для ланцюгового очищення та pandera для валідації якості. Покрокові приклади з кодом.

Вступ: чому автоматизоване очищення даних — це необхідність

Є така відома статистика: дата-сайєнтисти витрачають приблизно 80% свого часу на очищення та підготовку даних. Лише 20% лишається на моделювання, аналіз і власне отримання інсайтів — тобто на все те, заради чого ця робота й починалась. І чесно кажучи, ручне очищення — це не просто повільно. Воно не відтворюване, схильне до помилок і практично не масштабується.

Уявіть ситуацію. Ви вручну очистили набір даних, побудували модель, отримали результати. Все чудово. А через місяць приходять нові дані з тими самими проблемами — і все починається спочатку. Колега просить повторити ваші кроки? Без задокументованого пайплайну це перетворюється на детективне розслідування.

Рішення — побудова автоматизованих, відтворюваних пайплайнів очищення. У цій статті ми розберемо, як це зробити за допомогою трьох інструментів: pandas 3.0 з його новими можливостями, pyjanitor для елегантного ланцюгового очищення та pandera для валідації якості даних.

Отже, давайте розбиратися.

Що нового у pandas 3.0 для очищення даних

Pandas 3.0, випущений 21 січня 2026 року, приніс справді фундаментальні зміни. Це не косметичне оновлення — це переосмислення базових підходів до роботи з DataFrame. І для тих, хто займається очищенням даних, ці зміни особливо відчутні.

PyArrow-рядки як тип за замовчуванням

Мабуть, найбільш помітна зміна — новий рядковий тип даних. Раніше текстові колонки зберігалися як object dtype, що було повільно та неефективно з точки зору пам'яті. Тепер pandas 3.0 використовує PyArrow як бекенд для рядків за замовчуванням (якщо PyArrow встановлено). Результати вражають: рядкові операції працюють у 5-10 разів швидше, а споживання пам'яті зменшується до 50%.

# Порівняння старого та нового підходу до рядків
import pandas as pd

# pandas 2.x: рядки зберігалися як object
# df = pd.DataFrame({"name": ["Олена", "Іван", "Марія"]})
# df["name"].dtype  # object — повільно, багато пам'яті

# pandas 3.0: рядки автоматично використовують PyArrow
df = pd.DataFrame({"name": ["Олена", "Іван", "Марія"]})
print(df["name"].dtype)  # string[pyarrow] — швидко, ефективно

# Рядкові операції тепер значно швидші
df["name_upper"] = df["name"].str.upper()
df["name_len"] = df["name"].str.len()
print(df)

Для очищення даних це має величезне значення. Операції нормалізації тексту, пошук підрядків, заміна символів — все це тепер працює набагато швидше, причому без жодних змін у вашому коді.

Copy-on-Write як поведінка за замовчуванням

Друга революційна зміна — Copy-on-Write (CoW) тепер є єдиним режимом роботи. Це означає кінець епохи SettingWithCopyWarning. Знаєте, того дратівливого попередження, яке переслідувало буквально кожного, хто працював з pandas? Тепер ланцюгове присвоєння викликає помилку замість непередбачуваної поведінки, а зайві виклики .copy() більше не потрібні.

import pandas as pd

df = pd.DataFrame({
    "ціна": [100, 200, 300, 400],
    "категорія": ["A", "B", "A", "B"]
})

# pandas 2.x: непередбачувана поведінка, SettingWithCopyWarning
# df[df["категорія"] == "A"]["ціна"] = 999  # Чи змінить це оригінал?

# pandas 3.0: Copy-on-Write — безпечні трансформації
subset = df[df["категорія"] == "A"]
# subset["ціна"] = 999  # Помилка! Ланцюгове присвоєння заборонено

# Правильний спосіб — використовувати .loc або створити копію
df.loc[df["категорія"] == "A", "ціна"] = 999
print(df)

Метод .pipe() для побудови чистих пайплайнів

Метод .pipe() існував і раніше, але в контексті pandas 3.0 він набуває нового значення. Copy-on-Write робить кожен крок пайплайну безпечним, а .pipe() дозволяє організувати ці кроки у читабельний ланцюг — замість вкладених функцій, де легко заплутатися.

import pandas as pd

# Визначаємо функції очищення
def видалити_дублікати(df):
    """Видаляє повні дублікати рядків"""
    return df.drop_duplicates()

def нормалізувати_текст(df, колонки):
    """Приводить текстові колонки до нижнього регістру"""
    for col in колонки:
        df[col] = df[col].str.lower().str.strip()
    return df

def заповнити_пропуски(df, стратегія):
    """Заповнює числові пропуски за обраною стратегією"""
    числові = df.select_dtypes(include="number").columns
    if стратегія == "медіана":
        df[числові] = df[числові].fillna(df[числові].median())
    return df

# Вкладені функції — складно читати
# результат = заповнити_пропуски(нормалізувати_текст(видалити_дублікати(df), ["місто"]), "медіана")

# .pipe() — чистий пайплайн
результат = (
    df
    .pipe(видалити_дублікати)
    .pipe(нормалізувати_текст, колонки=["місто"])
    .pipe(заповнити_пропуски, стратегія="медіана")
)
print(результат)

Порівняйте обидва підходи — з .pipe() код читається згори вниз рівно в тому порядку, в якому відбуваються трансформації. Для підтримки та документування пайплайнів це критично важливо.

Ланцюгове очищення з pyjanitor

Якщо .pipe() у pandas — це фундамент для побудови пайплайнів, то pyjanitor — це вже повноцінний набір інструментів. Pyjanitor (актуальна версія v0.32.19, лютий 2026) — бібліотека, натхненна R-пакетом janitor, яка розширює pandas API десятками зручних методів для очищення даних. Усі вони підтримують ланцюгове виконання (method chaining), тож код виходить виразний і читабельний.

clean_names() — стандартизація назв колонок

Брудні назви колонок — одна з найчастіших проблем реальних даних. Пробіли, великі літери, спеціальні символи, різна транслітерація — все це ускладнює подальшу роботу. І clean_names() вирішує цю проблему одним викликом.

import pandas as pd
import janitor

# Типові "брудні" назви колонок з реальних джерел
df = pd.DataFrame({
    "Ім'я Клієнта": ["Олена", "Іван"],
    "  Дата Замовлення  ": ["2026-01-15", "2026-01-16"],
    "Сума (грн)": [1500, 2300],
    "Категорія___Товару": ["Електроніка", "Одяг"],
    "% Знижки": [10, 15]
})

# Одним викликом приводимо до стандартного формату
df_clean = df.clean_names()
print(df_clean.columns.tolist())
# ['ім_я_клієнта', 'дата_замовлення', 'сума_грн', 'категорія_товару', '_знижки']

# Усі пробіли, дужки, спецсимволи замінені на підкреслення
# Усе у нижньому регістрі — ідеально для подальшої роботи

remove_empty() — видалення порожніх рядків і колонок

Реальні дані часто містять повністю порожні рядки або колонки. Це артефакти імпорту з Excel, CSV з зайвими роздільниками тощо. Метод remove_empty() автоматично їх знаходить та прибирає — просто і елегантно.

import pandas as pd
import numpy as np
import janitor

# Дані з порожніми рядками та колонками
df = pd.DataFrame({
    "товар": ["Ноутбук", None, "Телефон", None],
    "ціна": [25000, None, 12000, None],
    "порожня_колонка": [None, None, None, None],
    "кількість": [2, None, 5, None]
})

# Видаляємо порожні рядки та колонки
df_clean = df.remove_empty()
print(df_clean)
# Залишаться лише рядки та колонки з реальними даними

also() — побічні ефекти без розриву ланцюга

А ось це, на мою думку, одна з найкорисніших фішок pyjanitor — метод also(). Він дозволяє виконувати побічні ефекти (логування, збереження проміжних результатів, виведення діагностики) прямо всередині ланцюга методів, не перериваючи його. Для дебагінгу та моніторингу пайплайнів це незамінна річ.

import pandas as pd
import janitor
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")

df = pd.read_csv("sales_data.csv")

# Ланцюг очищення з логуванням на кожному кроці
df_clean = (
    df
    .clean_names()
    .also(lambda df: logging.info(f"Після clean_names: {df.shape[1]} колонок"))
    .remove_empty()
    .also(lambda df: logging.info(f"Після remove_empty: {df.shape[0]} рядків"))
    .dropna(subset=["ціна", "кількість"])
    .also(lambda df: logging.info(f"Після dropna: {df.shape[0]} рядків"))
    .also(lambda df: df.to_csv("checkpoint_cleaned.csv", index=False))
    .rename_column("ід_замовлення", "order_id")
)
# Логи покажуть, скільки даних залишилося після кожного кроку
# Проміжний результат збережено у checkpoint_cleaned.csv

Повний приклад ланцюгового очищення

Давайте об'єднаємо все разом у реалістичний приклад — очищення даних продажів:

import pandas as pd
import numpy as np
import janitor

# Симуляція брудних даних
raw_data = pd.DataFrame({
    "ID Замовлення": [1001, 1002, 1003, 1004, 1005, 1005, None],
    "  Назва Товару  ": ["Ноутбук", "ТЕЛЕФОН", "  планшет  ", "ноутбук", "Телефон", "Телефон", None],
    "Ціна (грн)": [25000, 12000, 8000, 26000, -500, 12000, None],
    "Кількість Штук": [2, 1, 3, 1, 1, 1, None],
    "Дата Замовлення": ["2026-01-15", "2026-01-16", "2026-13-01", "2026-01-18", "2026-01-19", "2026-01-19", None],
    "Пуста Колонка": [None, None, None, None, None, None, None]
})

# Повний ланцюг очищення
df_clean = (
    raw_data
    .clean_names()                                          # Стандартизація назв
    .remove_empty()                                         # Видалення порожніх рядків/колонок
    .drop_duplicates(subset=["id_замовлення"])              # Видалення дублікатів
    .rename_column("ціна_грн", "ціна")                     # Перейменування колонки
    .rename_column("кількість_штук", "кількість")          # Перейменування колонки
    .transform_column("назва_товару", lambda x: x.str.strip().str.lower())  # Нормалізація тексту
    .filter_on("ціна > 0")                                 # Видалення некоректних цін
)

print(df_clean)
print(f"\nОчищено: {len(raw_data)} → {len(df_clean)} рядків")

Реєстрація власних функцій очищення

Pyjanitor дозволяє реєструвати власні функції як методи DataFrame через декоратор @pf.register_dataframe_method. Це дає змогу створювати доменно-специфічні методи очищення, які інтегруються у ланцюг так само органічно, як і вбудовані. По суті, ви розширюєте pandas під свої потреби.

import pandas as pd
import pandas_flavor as pf
import janitor

@pf.register_dataframe_method
def видалити_викиди(df, колонка, множник=1.5):
    """
    Видаляє викиди за методом IQR.
    Реєструється як метод DataFrame для використання у ланцюгу.
    """
    Q1 = df[колонка].quantile(0.25)
    Q3 = df[колонка].quantile(0.75)
    IQR = Q3 - Q1
    нижня_межа = Q1 - множник * IQR
    верхня_межа = Q3 + множник * IQR
    return df[(df[колонка] >= нижня_межа) & (df[колонка] <= верхня_межа)]

@pf.register_dataframe_method
def привести_типи(df, схема_типів):
    """
    Приводить колонки до зазначених типів.
    """
    for колонка, тип in схема_типів.items():
        if колонка in df.columns:
            df[колонка] = df[колонка].astype(тип)
    return df

# Тепер власні функції — частина ланцюга
df_final = (
    df_clean
    .видалити_викиди("ціна", множник=2.0)
    .привести_типи({"кількість": "int32", "ціна": "float64"})
)
print(df_final.dtypes)

Валідація даних з pandera

Очистити дані — це лише половина справи. Як переконатися, що після очищення дані дійсно відповідають вашим очікуванням? Чи всі ціни додатні? Чи немає пропущених значень у критичних колонках? Чи дати мають правильний формат?

Ось тут і з'являється pandera (актуальна версія v0.29.0) — бібліотека для статистичної валідації DataFrame.

Новий рекомендований імпорт

Починаючи з версії 0.29.0, pandera рекомендує використовувати новий модуль pandera.pandas замість імпорту з верхнього рівня. Старий спосіб через import pandera as pa для pandas-об'єктів є застарілим і генерує FutureWarning. Зверніть на це увагу — зміна невелика, але важлива.

# Старий спосіб (застарілий з v0.29.0)
# import pandera as pa  # FutureWarning!

# Новий рекомендований спосіб
import pandera.pandas as pa

# Решта API залишається без змін — змінюється лише імпорт

DataFrameSchema — визначення схеми колонок

DataFrameSchema — це декларативний спосіб описати, як мають виглядати ваші дані. Ви визначаєте очікувані колонки, їхні типи та обмеження, а pandera перевіряє DataFrame на відповідність.

import pandas as pd
import pandera.pandas as pa

# Визначаємо схему для очищених даних продажів
schema = pa.DataFrameSchema(
    columns={
        "id_замовлення": pa.Column(
            int,
            nullable=False,
            unique=True,
            description="Унікальний ідентифікатор замовлення"
        ),
        "назва_товару": pa.Column(
            str,
            checks=pa.Check.str_length(min_value=1, max_value=100),
            nullable=False,
            description="Назва товару у нижньому регістрі"
        ),
        "ціна": pa.Column(
            float,
            checks=[
                pa.Check.greater_than(0),
                pa.Check.less_than(1_000_000)
            ],
            nullable=False,
            description="Ціна у гривнях, додатне число"
        ),
        "кількість": pa.Column(
            int,
            checks=pa.Check.in_range(min_value=1, max_value=10000),
            nullable=False,
            description="Кількість одиниць товару"
        ),
    },
    strict=False,  # Дозволяємо додаткові колонки
    coerce=True    # Автоматично приводимо типи
)

# Валідація
df_validated = schema.validate(df_clean)
print("Дані пройшли валідацію!")

DataFrameModel — API у стилі Pydantic

Якщо вам ближче класовий підхід (як у Pydantic), pandera пропонує DataFrameModel. Це елегантний спосіб визначити схему через анотації типів — і виглядає це дуже знайомо для тих, хто працював із Pydantic.

import pandas as pd
import pandera.pandas as pa
from pandera.typing import Series

class СхемаПродажів(pa.DataFrameModel):
    """Схема для валідації очищених даних продажів."""

    id_замовлення: Series[int] = pa.Field(nullable=False, unique=True)
    назва_товару: Series[str] = pa.Field(
        nullable=False,
        str_length={"min_value": 1, "max_value": 100}
    )
    ціна: Series[float] = pa.Field(gt=0, lt=1_000_000)
    кількість: Series[int] = pa.Field(ge=1, le=10000)

    class Config:
        strict = False
        coerce = True

# Використання — ідентичне DataFrameSchema
df_validated = СхемаПродажів.validate(df_clean)
print("Валідація через DataFrameModel пройшла успішно!")

Ледача валідація — збір усіх помилок одразу

За замовчуванням pandera зупиняється на першій помилці. Це незручно, коли треба побачити повну картину проблем. Ледача валідація (lazy=True) збирає всі помилки і віддає їх у вигляді структурованого звіту — що неймовірно корисно під час розробки.

import pandas as pd
import pandera.pandas as pa

# Дані з кількома проблемами одночасно
df_з_помилками = pd.DataFrame({
    "id_замовлення": [1, 2, 2, 4],         # Дублікат ID
    "назва_товару": ["ноутбук", "", None, "телефон"],  # Порожній рядок та None
    "ціна": [25000, -100, 8000, 0],        # Від'ємна ціна та нуль
    "кількість": [2, 1, 3, -1]             # Від'ємна кількість
})

schema = pa.DataFrameSchema(
    columns={
        "id_замовлення": pa.Column(int, nullable=False, unique=True),
        "назва_товару": pa.Column(str, checks=pa.Check.str_length(min_value=1), nullable=False),
        "ціна": pa.Column(float, checks=pa.Check.greater_than(0), nullable=False),
        "кількість": pa.Column(int, checks=pa.Check.greater_than(0), nullable=False),
    },
    coerce=True
)

# Ледача валідація — збираємо ВСІ помилки
try:
    schema.validate(df_з_помилками, lazy=True)
except pa.errors.SchemaErrors as exc:
    print("Знайдені проблеми у даних:")
    print(exc.failure_cases)
    # Виводить таблицю з усіма помилками:
    # - яка колонка
    # - яка перевірка не пройшла
    # - які конкретно значення є проблемними
    # - у яких рядках знайдені помилки

Ледача валідація незамінна при розробці пайплайнів: замість виправлення помилок по одній, ви бачите повну картину одразу й можете спланувати всі необхідні кроки очищення.

Побудова повного автоматизованого пайплайну

Тепер — найцікавіше. Об'єднаємо всі три інструменти в один наскрізний пайплайн. Ми побудуємо повний цикл: завантаження брудних даних CSV, очищення через pyjanitor, трансформація за допомогою pandas 3.0 .pipe(), валідація з pandera і збереження результату.

Вхідні дані

Уявімо, що ми отримали CSV-файл з даними продажів інтернет-магазину. Дані мають типові проблеми — непослідовні назви колонок, пропущені значення, дублікати, викиди, невірні типи та помилки. Загалом, класика жанру.

import pandas as pd
import numpy as np
import janitor
import pandas_flavor as pf
import pandera.pandas as pa
from pandera.typing import Series
import logging
from datetime import datetime

# Налаштування логування
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)

# ============================================================
# КРОК 1: Завантаження брудних даних
# ============================================================

# Симуляція реалістичних брудних даних
np.random.seed(42)
n = 500

raw_df = pd.DataFrame({
    "Order ID": range(1000, 1000 + n),
    "  Product Name  ": np.random.choice(
        ["  Ноутбук ASUS  ", "ТЕЛЕФОН Samsung", "планшет Apple",
         "навушники Sony", "Клавіатура Logitech", None],
        size=n
    ),
    "Price (UAH)": np.where(
        np.random.random(n) > 0.95,
        np.random.choice([-100, 0, 999999], size=n),   # 5% некоректних цін
        np.random.uniform(500, 50000, n).round(2)
    ),
    "Quantity  ": np.where(
        np.random.random(n) > 0.97,
        np.random.choice([-1, 0], size=n),              # 3% некоректних кількостей
        np.random.randint(1, 20, n)
    ),
    "Order Date": pd.date_range("2025-06-01", periods=n, freq="4h").astype(str),
    "Customer Email": np.random.choice(
        ["[email protected]", "[email protected]", "[email protected]",
         "invalid-email", None, "[email protected]"],
        size=n
    ),
    "Region": np.random.choice(
        ["Київ", "Львів", "Одеса", "Харків", " київ ", "ЛЬВІВ", None],
        size=n
    ),
    "Empty Column": None,
    "Notes  ": None
})

# Додаємо кілька повних дублікатів
duplicates = raw_df.sample(20, random_state=42)
raw_df = pd.concat([raw_df, duplicates], ignore_index=True)

logger.info(f"Завантажено {len(raw_df)} рядків з {len(raw_df.columns)} колонками")
print(f"Типи даних:\n{raw_df.dtypes}\n")
print(f"Пропущені значення:\n{raw_df.isnull().sum()}\n")

Реєстрація власних методів очищення

# ============================================================
# КРОК 2: Визначення власних функцій очищення
# ============================================================

@pf.register_dataframe_method
def нормалізувати_регіони(df, колонка="region"):
    """Стандартизує назви регіонів: видаляє пробіли, приводить до заголовного регістру."""
    df[колонка] = (
        df[колонка]
        .str.strip()
        .str.lower()
        .str.capitalize()
    )
    return df

@pf.register_dataframe_method
def валідувати_email(df, колонка="customer_email"):
    """Позначає невалідні email як None."""
    # Проста перевірка: email повинен містити @ та крапку після @
    маска = df[колонка].str.contains(r'^[^@]+@[^@]+\.[^@]+$', na=False)
    df.loc[~маска, колонка] = None
    return df

@pf.register_dataframe_method
def видалити_цінові_викиди(df, колонка="price_uah", множник=1.5):
    """Видаляє рядки з цінами-викидами за методом IQR."""
    Q1 = df[колонка].quantile(0.25)
    Q3 = df[колонка].quantile(0.75)
    IQR = Q3 - Q1
    нижня = Q1 - множник * IQR
    верхня = Q3 + множник * IQR
    return df[(df[колонка] >= нижня) & (df[колонка] <= верхня)]

@pf.register_dataframe_method
def додати_обчислювані_колонки(df):
    """Додає колонки, обчислені на основі існуючих даних."""
    df["total_uah"] = df["price_uah"] * df["quantity"]
    df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
    df["order_month"] = df["order_date"].dt.to_period("M").astype(str)
    return df

Визначення схеми валідації

# ============================================================
# КРОК 3: Визначення схеми валідації з pandera
# ============================================================

class СхемаОчищенихПродажів(pa.DataFrameModel):
    """
    Схема валідації для очищених даних продажів.
    Визначає очікувані типи, обмеження та перевірки.
    """
    order_id: Series[int] = pa.Field(nullable=False, unique=True, ge=1000)
    product_name: Series[str] = pa.Field(
        nullable=False,
        str_length={"min_value": 2, "max_value": 100}
    )
    price_uah: Series[float] = pa.Field(
        nullable=False,
        gt=0,
        lt=100_000,
        description="Ціна товару у гривнях"
    )
    quantity: Series[int] = pa.Field(
        nullable=False,
        ge=1,
        le=1000,
        description="Кількість одиниць"
    )
    order_date: Series[pa.DateTime] = pa.Field(nullable=False)
    region: Series[str] = pa.Field(nullable=False, str_length={"min_value": 2})
    total_uah: Series[float] = pa.Field(gt=0, description="Загальна сума")

    class Config:
        strict = False   # Дозволяємо додаткові колонки
        coerce = True    # Автоматичне приведення типів
        name = "Очищені продажі"

Запуск пайплайну

# ============================================================
# КРОК 4: Побудова та запуск повного пайплайну
# ============================================================

def запустити_пайплайн(raw_df):
    """
    Повний пайплайн: очищення → трансформація → валідація → збереження.
    """
    logger.info("=" * 60)
    logger.info("СТАРТ ПАЙПЛАЙНУ ОЧИЩЕННЯ ДАНИХ")
    logger.info("=" * 60)

    # Фаза 1: Очищення з pyjanitor
    df_clean = (
        raw_df
        .clean_names()
        .also(lambda df: logger.info(
            f"[clean_names] Колонки: {df.columns.tolist()}"
        ))
        .remove_empty()
        .also(lambda df: logger.info(
            f"[remove_empty] Залишилось колонок: {df.shape[1]}, рядків: {df.shape[0]}"
        ))
        .drop_duplicates(subset=["order_id"])
        .also(lambda df: logger.info(
            f"[drop_duplicates] Рядків після видалення дублікатів: {df.shape[0]}"
        ))
        .dropna(subset=["product_name", "region"])
        .also(lambda df: logger.info(
            f"[dropna] Рядків після видалення пропусків: {df.shape[0]}"
        ))
        .transform_column(
            "product_name",
            lambda x: x.str.strip().str.lower()
        )
        .нормалізувати_регіони("region")
        .валідувати_email("customer_email")
        .also(lambda df: logger.info(
            f"[валідація email] Валідних email: {df['customer_email'].notna().sum()}"
        ))
    )

    # Фаза 2: Трансформація з pandas .pipe()
    df_transformed = (
        df_clean
        .pipe(lambda df: df[df["price_uah"] > 0])
        .pipe(lambda df: df[df["quantity"] > 0])
        .видалити_цінові_викиди("price_uah", множник=2.0)
        .додати_обчислювані_колонки()
        .also(lambda df: logger.info(
            f"[трансформація] Фінальна кількість рядків: {df.shape[0]}"
        ))
        .also(lambda df: logger.info(
            f"[трансформація] Загальна сума продажів: {df['total_uah'].sum():,.2f} грн"
        ))
    )

    # Фаза 3: Валідація з pandera
    logger.info("Запуск валідації даних...")
    try:
        df_validated = СхемаОчищенихПродажів.validate(df_transformed, lazy=True)
        logger.info("Валідація пройшла успішно!")
    except pa.errors.SchemaErrors as exc:
        logger.error(f"Валідація виявила {len(exc.failure_cases)} помилок:")
        logger.error(f"\n{exc.failure_cases}")
        # У продакшні тут можна зберегти звіт про помилки
        raise

    # Фаза 4: Збереження результату
    output_path = "sales_cleaned.parquet"
    df_validated.to_parquet(output_path, index=False)
    logger.info(f"Очищені дані збережено у {output_path}")

    logger.info("=" * 60)
    logger.info("ПАЙПЛАЙН ЗАВЕРШЕНО УСПІШНО")
    logger.info("=" * 60)

    return df_validated

# Запуск
df_result = запустити_пайплайн(raw_df)
print(f"\nРезультат: {df_result.shape[0]} рядків, {df_result.shape[1]} колонок")
print(f"Типи даних:\n{df_result.dtypes}")
print(f"\nПерші 5 рядків:")
print(df_result.head())

Цей пайплайн демонструє головну перевагу автоматизованого підходу: кожен крок задокументований, логується і його можна повторити. Якщо завтра прийдуть нові дані з тими самими проблемами — просто запускаєте запустити_пайплайн(new_data) і йдете пити каву.

Найкращі практики побудови пайплайнів очищення

Побудова автоматизованих пайплайнів — це не тільки про код. Є набір практик, які допоможуть створювати надійні й підтримувані рішення. Ось основні з них.

Завжди зберігайте оригінальні дані недоторканими

Це золоте правило. Ніколи не модифікуйте вхідний файл. Завжди працюйте з копією або зберігайте результат окремо. У pandas 3.0 Copy-on-Write робить це простіше — кожна операція створює нову версію даних, не чіпаючи оригінал.

# Правильний підхід: оригінал залишається недоторканим
raw_df = pd.read_csv("data/raw/sales_2026.csv")    # Вхідні дані
clean_df = запустити_пайплайн(raw_df)               # Очищення
clean_df.to_parquet("data/processed/sales_2026_clean.parquet")  # Збереження окремо

# Ніколи не робіть так:
# df = pd.read_csv("data/sales.csv")
# df.to_csv("data/sales.csv")  # Перезапис оригіналу!

Документуйте кроки очищення

Кожна функція у пайплайні повинна мати docstring, що пояснює чому виконується саме цей крок. Не що робить функція (це й так видно з коду), а чому — яку бізнес-проблему вона вирішує. Повірте, через півроку ви подякуєте собі за це.

Використовуйте контроль версій для коду пайплайну

Код очищення — це такий самий код, як і решта проєкту. Він має бути в Git, мати осмислені коміти та проходити код-рев'ю. Якщо хтось змінює логіку очищення — це повинно бути видно в історії.

Тестуйте граничні випадки

Що станеться, якщо вхідний файл порожній? Якщо всі значення в колонці — None? Якщо є лише один рядок? Автоматичні тести для таких сценаріїв — це не зайвий перфекціонізм, а реальна страховка від збоїв у продакшні.

# Приклад тесту граничного випадку
def test_пайплайн_з_порожніми_даними():
    """Перевірка, що пайплайн коректно обробляє порожній DataFrame."""
    empty_df = pd.DataFrame(columns=["Order ID", "  Product Name  ",
                                      "Price (UAH)", "Quantity  ",
                                      "Order Date", "Customer Email",
                                      "Region", "Empty Column", "Notes  "])
    result = запустити_пайплайн(empty_df)
    assert len(result) == 0
    assert "order_id" in result.columns

Логуйте проміжні результати з .also()

Як ми вже бачили, метод .also() з pyjanitor — ідеальний інструмент для цього. Використовуйте його на кожному значущому кроці: після видалення рядків, після фільтрації, після трансформацій. Коли кінцевий результат виглядає неочікувано, логи допоможуть швидко зрозуміти, де саме «зникають» дані.

Профілюйте дані до та після очищення

Порівняння статистик до та після очищення — важливий етап контролю якості. Потрібно переконатися, що ви не видалили занадто багато даних і не внесли зміщення.

def профілювати_дані(df, назва="DataFrame"):
    """Виводить базовий профіль DataFrame для контролю якості."""
    print(f"\n{'=' * 40}")
    print(f"Профіль: {назва}")
    print(f"{'=' * 40}")
    print(f"Рядків: {len(df)}")
    print(f"Колонок: {len(df.columns)}")
    print(f"Пропущені значення (загалом): {df.isnull().sum().sum()}")
    print(f"Дублікатів: {df.duplicated().sum()}")
    print(f"Пам'ять: {df.memory_usage(deep=True).sum() / 1024:.1f} КБ")

    числові = df.select_dtypes(include="number")
    if not числові.empty:
        print(f"\nЧислові колонки — статистика:")
        print(числові.describe().round(2))

# Використання
профілювати_дані(raw_df, "Вхідні дані")
профілювати_дані(df_result, "Очищені дані")

Часті запитання (FAQ)

Чим pandas 3.0 краще для очищення даних порівняно з pandas 2.x?

Тут три ключові моменти. По-перше, PyArrow-рядки за замовчуванням прискорюють усі текстові операції (нормалізація, пошук, заміна) у 5-10 разів — а текстові операції це ж значна частина будь-якого очищення. По-друге, Copy-on-Write як єдиний режим роботи усуває цілий клас помилок, пов'язаних із випадковою модифікацією даних. І по-третє, вимога Python 3.11+ означає доступ до покращеного GC та швидшого інтерпретатора, що загалом прискорює роботу пайплайнів.

Чи потрібен pyjanitor, якщо є .pipe() у pandas?

Хороше запитання. Метод .pipe() — це механізм для вбудовування функцій у ланцюг. Pyjanitor — це готова бібліотека з десятками спеціалізованих функцій (clean_names(), remove_empty(), also(), filter_on() тощо), які вже реалізовані та протестовані. Це як порівнювати молоток із цілим набором інструментів. Вони доповнюють одне одного, а не конкурують.

Як pandera допомагає у продакшн-пайплайнах?

У продакшні дані змінюються непередбачувано: джерело може почати надсилати порожні колонки, змінити формат дат або ввести нові категорії. Pandera діє як «контракт» на ваші дані — якщо щось не відповідає очікуванням, пайплайн негайно зупиняється з чітким повідомленням замість того, щоб тихо генерувати некоректні результати. Ледача валідація (lazy=True) особливо корисна: вона збирає всі помилки одразу, тож можна діагностувати проблему за один запуск.

Чи сумісний pyjanitor з pandas 3.0?

Так. Pyjanitor v0.32.19 (лютий 2026) повністю сумісний із pandas 3.0. Бібліотека працює через стандартний DataFrame API і реєструє методи через pandas_flavor, що забезпечує сумісність із новими версіями. Copy-on-Write не створює проблем, оскільки функції pyjanitor повертають нові DataFrame, а не модифікують існуючі через ланцюгове індексування.

Яка різниця між pandera та Great Expectations?

Обидва інструменти вирішують задачу валідації даних, але підходять для різних масштабів. Pandera — це легка бібліотека, яка інтегрується безпосередньо у Python-код і чудово працює з pandas DataFrame. Вона ідеальна для валідації в скриптах та ML-проєктах. Great Expectations — це платформа корпоративного рівня з веб-інтерфейсом, інтеграцією з оркестраторами (Airflow, Dagster) і підтримкою різних сховищ. Якщо ваш проєкт — Python-скрипт або невеликий сервіс, pandera буде простішим вибором. Для великих команд із десятками пайплайнів Great Expectations може бути виправданішим.

Про Автора Editorial Team

Our team of expert writers and editors.