Pipeline ETL in Python: Guida Pratica con pandas 3.0, SQLAlchemy e Pandera

Costruisci da zero una pipeline ETL completa in Python con pandas 3.0, SQLAlchemy 2.x e Pandera. Estrazione multi-sorgente, trasformazione dati, validazione e caricamento con codice pronto per la produzione.

Introduzione alle Pipeline ETL in Python

Se lavori con i dati — e nel 2026, chi non lo fa? — prima o poi ti ritrovi a dover spostare informazioni da un posto all'altro. File CSV che arrivano via email, API che sputano JSON, database che parlano linguaggi diversi. Il caos, insomma. Ed è qui che entrano in gioco le pipeline ETL (Extract, Transform, Load): il collante che tiene insieme qualsiasi architettura dati che si rispetti.

In questa guida costruiremo insieme una pipeline ETL completa partendo da zero. Useremo pandas 3.0 (uscito a gennaio 2026, finalmente!), SQLAlchemy 2.x per parlare con i database e Pandera per validare i dati prima che combinino guai. Il tutto con un progetto end-to-end realistico, logging, gestione errori e quelle best practice che in produzione fanno la differenza tra dormire tranquilli e ricevere chiamate alle 3 di notte.

Cos'è una Pipeline ETL e Perché Ti Serve

Una pipeline ETL è un processo automatizzato che muove i dati attraverso tre fasi:

  • Extract (Estrazione): recupero dei dati grezzi da una o più sorgenti — file CSV, API REST, database, web scraping
  • Transform (Trasformazione): pulizia, normalizzazione, arricchimento e validazione
  • Load (Caricamento): scrittura dei dati trasformati nella destinazione finale (database SQL, data warehouse, file Parquet)

Senza una pipeline ben strutturata, i dati restano sparsi ovunque in formati incompatibili. Analizzarli diventa un incubo. Una pipeline automatizzata ti garantisce ripetibilità, tracciabilità e qualità dei dati ad ogni esecuzione. Sembra banale, ma fidati: quando un report mostra numeri sbagliati al CEO, vuoi poter dire esattamente dove il problema è nato.

Configurazione dell'Ambiente di Sviluppo

Ok, partiamo dalle basi. Installiamo le dipendenze in un ambiente virtuale dedicato (mai inquinare il Python di sistema, per carità):

python -m venv etl-env
source etl-env/bin/activate   # Linux/macOS
# etl-env\Scripts\activate    # Windows

pip install pandas==3.0.1 sqlalchemy==2.0.38 pandera requests

Un rapido check per confermare che tutto sia a posto:

import pandas as pd
import sqlalchemy
import pandera

print(f"pandas: {pd.__version__}")       # 3.0.1
print(f"SQLAlchemy: {sqlalchemy.__version__}")  # 2.0.38
print(f"Pandera: {pandera.__version__}")

Novità di pandas 3.0 per le Pipeline ETL

pandas 3.0 non è un aggiornamento qualunque. Rilasciato il 21 gennaio 2026, introduce cambiamenti che impattano direttamente il modo in cui scriviamo le pipeline:

  • Copy-on-Write attivo di default: ogni operazione di indicizzazione restituisce una copia logica. Addio SettingWithCopyWarning — ora si ottiene un ChainedAssignmentError se si tenta un'assegnazione concatenata. Bisogna usare .loc[] per le modifiche in-place. All'inizio è un po' scomodo, ma alla lunga previene bug subdoli.
  • Nuovo dtype str per le stringhe: le colonne di testo vengono inferite come str (con backend PyArrow quando disponibile), non più come object. Performance e memoria ringraziano.
  • Risoluzione datetime in microsecondi: il default passa dai nanosecondi ai microsecondi, evitando quei fastidiosi errori con date prima del 1678 o dopo il 2262.
  • Richiede Python 3.11+

Struttura del Progetto

Organizziamo tutto in modo modulare. Niente script monolitici da 500 righe — ogni fase ha il suo file:

etl_project/
├── main.py              # Orchestratore della pipeline
├── extract.py           # Funzioni di estrazione
├── transform.py         # Funzioni di trasformazione
├── load.py              # Funzioni di caricamento
├── validate.py          # Schema di validazione Pandera
├── config.py            # Configurazione e costanti
├── data/
│   └── raw/             # Dati grezzi scaricati
└── logs/
    └── etl.log          # File di log

Questa struttura sembra eccessiva per un progetto piccolo? Forse. Ma quando la pipeline cresce (e cresce sempre), ti ringrazierai da solo.

Fase 1: Extract — Estrazione dei Dati

La fase di estrazione è dove raccogliamo i dati grezzi. In un progetto reale le sorgenti sono quasi sempre multiple: CSV locali, API REST, database remoti, a volte persino fogli Excel condivisi (sì, succede ancora). Vediamo i casi più comuni.

Estrazione da File CSV

import pandas as pd
import logging

logger = logging.getLogger(__name__)

def extract_csv(file_path: str) -> pd.DataFrame:
    """Estrae dati da un file CSV con gestione errori."""
    logger.info(f"Estrazione dati da {file_path}")
    try:
        df = pd.read_csv(
            file_path,
            dtype_backend="pyarrow",  # Backend ottimizzato in pandas 3.0
            parse_dates=["order_date"]
        )
        logger.info(f"Estratte {len(df)} righe da {file_path}")
        return df
    except FileNotFoundError:
        logger.error(f"File non trovato: {file_path}")
        raise
    except pd.errors.ParserError as e:
        logger.error(f"Errore di parsing: {e}")
        raise

Nota il parametro dtype_backend="pyarrow": è una delle piccole gemme di pandas 3.0 che fa una differenza concreta sulle performance, specialmente con file grandi.

Estrazione da API REST

import requests

def extract_api(url: str, params: dict = None) -> pd.DataFrame:
    """Estrae dati da un endpoint API REST."""
    logger.info(f"Chiamata API: {url}")
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()

    data = response.json()
    df = pd.DataFrame(data)
    logger.info(f"Estratte {len(df)} righe dall'API")
    return df

Estrazione da Database con SQLAlchemy

from sqlalchemy import create_engine, text

def extract_database(connection_string: str, query: str) -> pd.DataFrame:
    """Estrae dati da un database SQL."""
    engine = create_engine(connection_string)

    with engine.connect() as conn:
        df = pd.read_sql(text(query), conn)
        logger.info(f"Estratte {len(df)} righe dal database")

    engine.dispose()
    return df

Gestione di Sorgenti Multiple

Nella pratica, le sorgenti sono quasi sempre più di una. Ecco come orchestrare l'estrazione:

def extract_all() -> dict[str, pd.DataFrame]:
    """Estrae dati da tutte le sorgenti configurate."""
    datasets = {}

    # Ordini da CSV
    datasets["orders"] = extract_csv("data/raw/orders.csv")

    # Prodotti da API
    datasets["products"] = extract_api(
        "https://api.example.com/products",
        params={"status": "active"}
    )

    # Clienti da database
    datasets["customers"] = extract_database(
        "postgresql://user:pass@localhost:5432/crm",
        "SELECT id, name, email, region FROM customers WHERE active = true"
    )

    return datasets

Fase 2: Transform — Trasformazione e Pulizia

Qui si fa sul serio. La trasformazione è il cuore della pipeline: pulizia, normalizzazione, arricchimento, validazione. Con pandas 3.0, ricorda che il Copy-on-Write è attivo di default — tutte le modifiche vanno fatte con .loc[] o creando nuove colonne tramite .assign().

Pulizia e Normalizzazione

def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
    """Trasforma e pulisce il DataFrame degli ordini."""
    logger.info("Inizio trasformazione ordini")

    # Rimuovi duplicati
    df = df.drop_duplicates(subset=["order_id"])

    # Gestione valori mancanti
    df = df.dropna(subset=["order_id", "customer_id"])  # Campi obbligatori
    df = df.fillna({"discount": 0.0, "notes": ""})      # Default per opzionali

    # Normalizzazione stringhe (pandas 3.0 usa dtype str nativo)
    df = df.assign(
        product_name=df["product_name"].str.strip().str.lower(),
        customer_email=df["customer_email"].str.strip().str.lower()
    )

    # Colonne calcolate
    df = df.assign(
        total_with_tax=df["amount"] * 1.22,              # IVA italiana 22%
        net_amount=df["amount"] - df["discount"],
        order_year=df["order_date"].dt.year,
        order_month=df["order_date"].dt.month
    )

    # Filtra ordini con importo valido
    df = df.loc[df["amount"] > 0]

    logger.info(f"Trasformazione completata: {len(df)} righe valide")
    return df

Un dettaglio che mi piace sottolineare: l'uso di .assign() per creare nuove colonne è perfettamente compatibile con il Copy-on-Write. Niente warning, niente comportamenti strani.

Arricchimento con Join tra DataFrame

def enrich_orders(
    orders: pd.DataFrame,
    customers: pd.DataFrame,
    products: pd.DataFrame
) -> pd.DataFrame:
    """Arricchisce gli ordini con dati clienti e prodotti."""
    enriched = (
        orders
        .merge(customers, on="customer_id", how="left", suffixes=("", "_cust"))
        .merge(products, on="product_id", how="left", suffixes=("", "_prod"))
    )

    # Segnala join non riusciti
    missing_customers = enriched["name"].isna().sum()
    missing_products = enriched["product_name_prod"].isna().sum()

    if missing_customers > 0:
        logger.warning(f"{missing_customers} ordini senza cliente corrispondente")
    if missing_products > 0:
        logger.warning(f"{missing_products} ordini senza prodotto corrispondente")

    return enriched

I left join con controllo dei match mancanti sono una di quelle cose che fanno la differenza tra una pipeline "che funziona" e una pipeline affidabile. Quei warning nel log ti salvano ore di debug.

Aggregazioni e Pivot con pandas 3.0

def create_monthly_summary(df: pd.DataFrame) -> pd.DataFrame:
    """Crea un riepilogo mensile delle vendite."""
    summary = (
        df
        .groupby(["order_year", "order_month", "region"], as_index=False)
        .agg(
            total_revenue=("net_amount", "sum"),
            order_count=("order_id", "count"),
            avg_order_value=("net_amount", "mean"),
            unique_customers=("customer_id", "nunique")
        )
    )

    summary = summary.sort_values(
        ["order_year", "order_month", "total_revenue"],
        ascending=[True, True, False]
    )
    return summary

Validazione dei Dati con Pandera

Ecco una cosa che ho imparato a mie spese: senza validazione, dati corrotti si propagano silenziosamente lungo tutta la pipeline. Li scopri solo quando il report finale mostra numeri assurdi e il tuo manager ti chiede spiegazioni. Pandera risolve esattamente questo problema, permettendoti di definire schemi di validazione dichiarativi per i DataFrame.

Definire uno Schema di Validazione

import pandera as pa

order_schema = pa.DataFrameSchema(
    columns={
        "order_id": pa.Column(
            int,
            checks=pa.Check.gt(0),
            unique=True,
            description="ID univoco dell'ordine"
        ),
        "customer_id": pa.Column(
            int,
            checks=pa.Check.gt(0),
            nullable=False
        ),
        "amount": pa.Column(
            float,
            checks=[
                pa.Check.gt(0, error="L'importo deve essere positivo"),
                pa.Check.lt(1_000_000, error="Importo sospetto: superiore a 1M")
            ]
        ),
        "discount": pa.Column(
            float,
            checks=pa.Check.in_range(0, 1),
            nullable=True
        ),
        "order_date": pa.Column(
            "datetime64[us]",  # Microsecondi: default pandas 3.0
            nullable=False
        ),
        "customer_email": pa.Column(
            str,
            checks=pa.Check.str_matches(
                r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$",
                error="Formato email non valido"
            ),
            nullable=True
        ),
    },
    checks=[
        pa.Check(
            lambda df: df["discount"].fillna(0) <= df["amount"],
            error="Lo sconto non può superare l'importo"
        )
    ],
    coerce=True
)

Quel check cross-colonna alla fine (sconto ≤ importo) è il tipo di regola business che spesso sfugge. Pandera rende queste validazioni eleganti da scrivere.

Applicare la Validazione nella Pipeline

def validate_data(df: pd.DataFrame, schema: pa.DataFrameSchema) -> pd.DataFrame:
    """Valida un DataFrame e restituisce solo le righe valide."""
    try:
        validated_df = schema.validate(df, lazy=True)
        logger.info("Validazione completata: tutti i controlli superati")
        return validated_df
    except pa.errors.SchemaErrors as err:
        logger.warning(
            f"Trovate {len(err.failure_cases)} violazioni di validazione"
        )
        # Log delle violazioni per analisi
        for _, row in err.failure_cases.iterrows():
            logger.warning(
                f"  Colonna: {row['column']}, "
                f"Check: {row['check']}, "
                f"Indice: {row['index']}"
            )
        # Restituisci solo le righe valide
        invalid_indices = err.failure_cases["index"].unique()
        valid_df = df.drop(index=invalid_indices, errors="ignore")
        logger.info(
            f"Righe valide: {len(valid_df)}/{len(df)} "
            f"({len(valid_df)/len(df)*100:.1f}%)"
        )
        return valid_df

L'opzione lazy=True è fondamentale: raccoglie tutti gli errori invece di fermarsi al primo. Così puoi avere un quadro completo di cosa non va nei dati.

Fase 3: Load — Caricamento nel Database

L'ultima tappa: caricare i dati trasformati e validati nella destinazione. Usiamo SQLAlchemy 2.x con le best practice attuali.

Caricamento in SQLite (Sviluppo)

from sqlalchemy import create_engine

def load_to_sqlite(df: pd.DataFrame, table_name: str, db_path: str) -> int:
    """Carica un DataFrame in un database SQLite."""
    engine = create_engine(f"sqlite:///{db_path}")

    with engine.begin() as conn:
        rows = df.to_sql(
            name=table_name,
            con=conn,
            if_exists="replace",
            index=False,
            method="multi",
            chunksize=500
        )

    engine.dispose()
    logger.info(f"Caricate {len(df)} righe nella tabella '{table_name}'")
    return len(df)

Caricamento in PostgreSQL (Produzione)

from sqlalchemy import create_engine, inspect

def load_to_postgres(
    df: pd.DataFrame,
    table_name: str,
    connection_string: str,
    if_exists: str = "append"
) -> int:
    """Carica un DataFrame in PostgreSQL con gestione transazionale."""
    engine = create_engine(connection_string)

    with engine.begin() as conn:
        # Verifica se la tabella esiste
        inspector = inspect(engine)
        table_exists = table_name in inspector.get_table_names()

        if not table_exists:
            logger.info(f"Creazione tabella '{table_name}'")

        df.to_sql(
            name=table_name,
            con=conn,
            if_exists=if_exists,
            index=False,
            method="multi",
            chunksize=1000
        )

    engine.dispose()
    logger.info(f"Caricate {len(df)} righe in '{table_name}'")
    return len(df)

Caricamento Incrementale con Upsert

In produzione, quasi mai vuoi rimpiazzare l'intera tabella. Spesso servono solo gli aggiornamenti ai record nuovi o modificati. L'upsert con PostgreSQL è la soluzione:

from sqlalchemy.dialects.postgresql import insert as pg_insert

def upsert_to_postgres(
    df: pd.DataFrame,
    table_name: str,
    engine,
    conflict_columns: list[str]
):
    """Esegue un upsert (INSERT ... ON CONFLICT UPDATE)."""
    from sqlalchemy import Table, MetaData

    metadata = MetaData()
    metadata.reflect(bind=engine)
    table = metadata.tables[table_name]

    records = df.to_dict(orient="records")

    with engine.begin() as conn:
        for chunk_start in range(0, len(records), 1000):
            chunk = records[chunk_start:chunk_start + 1000]
            stmt = pg_insert(table).values(chunk)
            update_cols = {
                c.name: stmt.excluded[c.name]
                for c in table.columns
                if c.name not in conflict_columns
            }
            stmt = stmt.on_conflict_do_update(
                index_elements=conflict_columns,
                set_=update_cols
            )
            conn.execute(stmt)

    logger.info(f"Upsert completato: {len(records)} record processati")

Orchestrare la Pipeline Completa

Bene, abbiamo tutti i pezzi. Assembliamoli in un orchestratore che gestisce logging e errori in modo serio:

import logging
from datetime import datetime

def setup_logging():
    """Configura il logging per la pipeline."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        handlers=[
            logging.FileHandler("logs/etl.log"),
            logging.StreamHandler()
        ]
    )

def run_pipeline():
    """Esegue la pipeline ETL completa."""
    setup_logging()
    logger = logging.getLogger("etl_pipeline")
    start_time = datetime.now()

    logger.info("=" * 60)
    logger.info("INIZIO PIPELINE ETL")
    logger.info("=" * 60)

    try:
        # --- EXTRACT ---
        logger.info("FASE 1: Estrazione")
        datasets = extract_all()

        # --- TRANSFORM ---
        logger.info("FASE 2: Trasformazione")
        orders = transform_orders(datasets["orders"])
        enriched = enrich_orders(
            orders, datasets["customers"], datasets["products"]
        )

        # --- VALIDATE ---
        logger.info("FASE 2b: Validazione")
        validated = validate_data(enriched, order_schema)

        # --- AGGREGATE ---
        logger.info("FASE 2c: Aggregazione")
        monthly = create_monthly_summary(validated)

        # --- LOAD ---
        logger.info("FASE 3: Caricamento")
        load_to_sqlite(validated, "orders_clean", "data/warehouse.db")
        load_to_sqlite(monthly, "monthly_summary", "data/warehouse.db")

        elapsed = (datetime.now() - start_time).total_seconds()
        logger.info(f"Pipeline completata in {elapsed:.2f} secondi")
        logger.info(f"Righe caricate: {len(validated)} ordini, {len(monthly)} righe riepilogo")

    except Exception as e:
        logger.error(f"Pipeline fallita: {e}", exc_info=True)
        raise

if __name__ == "__main__":
    run_pipeline()

Onestamente, il pattern try/except globale è il minimo indispensabile. In produzione, potresti voler wrappare ogni singola fase per avere un controllo più granulare — ma per iniziare questo approccio è più che sufficiente.

Ottimizzazione delle Performance

Quando i dataset crescono, le performance contano eccome. Ecco le strategie che uso più spesso.

Elaborazione a Chunk per File Grandi

def extract_large_csv(file_path: str, chunksize: int = 50_000) -> pd.DataFrame:
    """Elabora un CSV di grandi dimensioni a chunk."""
    chunks = []

    for i, chunk in enumerate(pd.read_csv(file_path, chunksize=chunksize)):
        logger.info(f"Elaborazione chunk {i + 1}: {len(chunk)} righe")
        # Trasforma ogni chunk singolarmente per risparmiare memoria
        transformed = transform_orders(chunk)
        chunks.append(transformed)

    result = pd.concat(chunks, ignore_index=True)
    logger.info(f"Totale righe elaborate: {len(result)}")
    return result

Usare dtype Categorici per Risparmiare Memoria

def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Ottimizza i tipi di dato per ridurre il consumo di memoria."""
    mem_before = df.memory_usage(deep=True).sum() / 1024**2

    # Converti colonne con pochi valori unici in categoriche
    for col in df.select_dtypes(include=["str", "object"]).columns:
        if df[col].nunique() / len(df) < 0.5:  # Meno del 50% di valori unici
            df[col] = df[col].astype("category")

    # Downcast numeri interi e float
    for col in df.select_dtypes(include=["int64"]).columns:
        df[col] = pd.to_numeric(df[col], downcast="integer")
    for col in df.select_dtypes(include=["float64"]).columns:
        df[col] = pd.to_numeric(df[col], downcast="float")

    mem_after = df.memory_usage(deep=True).sum() / 1024**2
    logger.info(f"Memoria: {mem_before:.1f} MB -> {mem_after:.1f} MB "
                f"(-{(1 - mem_after/mem_before)*100:.0f}%)")
    return df

Ho visto riduzioni del 60-70% di memoria con questa tecnica. Sembra un trucchetto, ma su dataset da milioni di righe può fare la differenza tra "funziona" e "out of memory".

Quando Considerare Polars

Se i dataset superano i 5-10 GB, pandas potrebbe non bastare. Polars, scritto in Rust, offre performance da 10 a 50 volte superiori grazie alla lazy evaluation e all'esecuzione parallela su tutti i core. La sintassi è abbastanza simile a pandas:

import polars as pl

# Equivalente Polars di una pipeline di trasformazione
df = (
    pl.scan_csv("data/raw/orders.csv")  # Lazy: non carica subito in memoria
    .filter(pl.col("amount") > 0)
    .with_columns(
        total_with_tax=pl.col("amount") * 1.22,
        order_month=pl.col("order_date").str.to_date().dt.month()
    )
    .group_by(["order_month", "region"])
    .agg(
        pl.col("amount").sum().alias("total_revenue"),
        pl.col("order_id").count().alias("order_count")
    )
    .collect()  # Esegue tutto il piano ottimizzato
)

Detto questo, per la maggior parte dei progetti, pandas 3.0 con il backend PyArrow è più che sufficiente. Polars ha senso quando arrivi a volumi davvero importanti.

Testing della Pipeline

Una pipeline ETL senza test è una bomba a orologeria. Non è questione di se qualcosa si romperà, ma di quando. Ecco come strutturare i test con pytest:

import pytest
import pandas as pd
from transform import transform_orders
from validate import order_schema

@pytest.fixture
def sample_orders():
    """Crea un DataFrame di esempio per i test."""
    return pd.DataFrame({
        "order_id": [1, 2, 3, 4],
        "customer_id": [101, 102, 103, 104],
        "product_name": [" Widget A ", "widget b", " WIDGET C", "widget d"],
        "customer_email": ["[email protected]", "[email protected]", "[email protected]", "[email protected]"],
        "amount": [100.0, 250.0, -50.0, 75.0],
        "discount": [0.0, 10.0, 0.0, 5.0],
        "order_date": pd.to_datetime(["2026-01-15", "2026-01-16", "2026-01-17", "2026-01-18"]),
        "notes": [None, "Urgente", None, ""]
    })

def test_transform_removes_negative_amounts(sample_orders):
    result = transform_orders(sample_orders)
    assert (result["amount"] > 0).all()

def test_transform_normalizes_product_names(sample_orders):
    result = transform_orders(sample_orders)
    for name in result["product_name"]:
        assert name == name.strip().lower()

def test_transform_calculates_tax(sample_orders):
    result = transform_orders(sample_orders)
    expected = result["amount"] * 1.22
    pd.testing.assert_series_equal(
        result["total_with_tax"], expected, check_names=False
    )

Best Practice per Pipeline ETL in Produzione

Passare da uno script locale a una pipeline production-ready richiede attenzione su diversi fronti. Ecco le cose che contano di più.

1. Idempotenza

Ogni esecuzione della pipeline deve produrre lo stesso risultato, indipendentemente da quante volte viene lanciata. Usa if_exists='replace' per le tabelle aggregate oppure implementa un upsert per i dati transazionali.

2. Logging Strutturato

Basta con i print() sparsi ovunque. Il modulo logging con livelli appropriati (INFO per il flusso normale, WARNING per anomalie, ERROR per fallimenti) è il minimo. Includi sempre metriche chiave: righe estratte, righe scartate, tempo di esecuzione per fase.

3. Gestione degli Errori Granulare

Ogni fase dovrebbe avere il proprio blocco try/except. Se l'estrazione da una sorgente fallisce, la pipeline deve poter continuare con le altre sorgenti e segnalare l'errore.

4. Configurazione Esternalizzata

Mai, mai hardcodare percorsi file, connection string o parametri nel codice. Variabili d'ambiente o file di configurazione:

import os

DB_CONNECTION = os.environ.get(
    "ETL_DB_URL",
    "sqlite:///data/warehouse.db"
)
RAW_DATA_PATH = os.environ.get("ETL_RAW_DATA", "data/raw/")
LOG_LEVEL = os.environ.get("ETL_LOG_LEVEL", "INFO")

5. Orchestrazione con Strumenti Professionali

Per pipeline in produzione, strumenti come Apache Airflow, Prefect o Dagster gestiscono schedulazione, monitoraggio e dipendenze tra task. Retry automatici, alerting, UI per il monitoraggio — tutto quello che serve per non impazzire quando qualcosa va storto alle 2 di notte.

Domande Frequenti (FAQ)

Quando usare pandas e quando passare a Polars o PySpark?

pandas va benissimo per dataset che entrano in RAM (indicativamente fino a 2-5 GB). Oltre quella soglia, Polars è eccellente su una singola macchina grazie a lazy evaluation e parallelismo automatico. PySpark serve quando i dati sono distribuiti su un cluster e superano la capacità di un singolo nodo.

Qual è la differenza tra ETL e ELT?

Nell'ETL tradizionale, i dati vengono trasformati prima del caricamento. Nell'ELT (Extract, Load, Transform), i dati grezzi vengono caricati direttamente nel data warehouse e le trasformazioni avvengono dentro il database con SQL. L'ELT è molto comune con warehouse cloud come BigQuery, Snowflake e Redshift, dove la potenza di calcolo per trasformare non manca.

Come si automatizza l'esecuzione di una pipeline ETL?

Per pipeline semplici, un cron job su Linux o il Task Scheduler di Windows bastano. Per pipeline complesse con dipendenze tra task, Apache Airflow permette di definire DAG (Directed Acyclic Graph) che gestiscono ordine di esecuzione, retry e monitoraggio. Prefect e Dagster sono alternative più moderne con una curva di apprendimento meno ripida.

Pandera o Great Expectations: quale scegliere?

Pandera è perfetto per validazioni leggere integrate nel codice, ideale per progetti piccoli e medi. Great Expectations è più adatto a contesti enterprise dove servono Data Docs condivise e integrazione CI/CD. Se usi solo pandas, Pandera è la scelta naturale — meno overhead, più praticità.

Come gestire i valori mancanti durante la trasformazione?

Dipende dal contesto. Per campi obbligatori (come gli ID), rimuovi le righe con dropna(). Per numeri opzionali, riempi con zero o con la mediana usando fillna(). Per le stringhe, un valore vuoto "" è spesso meglio di NaN. In pandas 3.0, le colonne stringa usano il nuovo dtype str con NaN come valore mancante predefinito.

Sull'Autore Editorial Team

Our team of expert writers and editors.