Introduzione alle Pipeline ETL in Python
Se lavori con i dati — e nel 2026, chi non lo fa? — prima o poi ti ritrovi a dover spostare informazioni da un posto all'altro. File CSV che arrivano via email, API che sputano JSON, database che parlano linguaggi diversi. Il caos, insomma. Ed è qui che entrano in gioco le pipeline ETL (Extract, Transform, Load): il collante che tiene insieme qualsiasi architettura dati che si rispetti.
In questa guida costruiremo insieme una pipeline ETL completa partendo da zero. Useremo pandas 3.0 (uscito a gennaio 2026, finalmente!), SQLAlchemy 2.x per parlare con i database e Pandera per validare i dati prima che combinino guai. Il tutto con un progetto end-to-end realistico, logging, gestione errori e quelle best practice che in produzione fanno la differenza tra dormire tranquilli e ricevere chiamate alle 3 di notte.
Cos'è una Pipeline ETL e Perché Ti Serve
Una pipeline ETL è un processo automatizzato che muove i dati attraverso tre fasi:
- Extract (Estrazione): recupero dei dati grezzi da una o più sorgenti — file CSV, API REST, database, web scraping
- Transform (Trasformazione): pulizia, normalizzazione, arricchimento e validazione
- Load (Caricamento): scrittura dei dati trasformati nella destinazione finale (database SQL, data warehouse, file Parquet)
Senza una pipeline ben strutturata, i dati restano sparsi ovunque in formati incompatibili. Analizzarli diventa un incubo. Una pipeline automatizzata ti garantisce ripetibilità, tracciabilità e qualità dei dati ad ogni esecuzione. Sembra banale, ma fidati: quando un report mostra numeri sbagliati al CEO, vuoi poter dire esattamente dove il problema è nato.
Configurazione dell'Ambiente di Sviluppo
Ok, partiamo dalle basi. Installiamo le dipendenze in un ambiente virtuale dedicato (mai inquinare il Python di sistema, per carità):
python -m venv etl-env
source etl-env/bin/activate # Linux/macOS
# etl-env\Scripts\activate # Windows
pip install pandas==3.0.1 sqlalchemy==2.0.38 pandera requests
Un rapido check per confermare che tutto sia a posto:
import pandas as pd
import sqlalchemy
import pandera
print(f"pandas: {pd.__version__}") # 3.0.1
print(f"SQLAlchemy: {sqlalchemy.__version__}") # 2.0.38
print(f"Pandera: {pandera.__version__}")
Novità di pandas 3.0 per le Pipeline ETL
pandas 3.0 non è un aggiornamento qualunque. Rilasciato il 21 gennaio 2026, introduce cambiamenti che impattano direttamente il modo in cui scriviamo le pipeline:
- Copy-on-Write attivo di default: ogni operazione di indicizzazione restituisce una copia logica. Addio
SettingWithCopyWarning— ora si ottiene unChainedAssignmentErrorse si tenta un'assegnazione concatenata. Bisogna usare.loc[]per le modifiche in-place. All'inizio è un po' scomodo, ma alla lunga previene bug subdoli. - Nuovo dtype
strper le stringhe: le colonne di testo vengono inferite comestr(con backend PyArrow quando disponibile), non più comeobject. Performance e memoria ringraziano. - Risoluzione datetime in microsecondi: il default passa dai nanosecondi ai microsecondi, evitando quei fastidiosi errori con date prima del 1678 o dopo il 2262.
- Richiede Python 3.11+
Struttura del Progetto
Organizziamo tutto in modo modulare. Niente script monolitici da 500 righe — ogni fase ha il suo file:
etl_project/
├── main.py # Orchestratore della pipeline
├── extract.py # Funzioni di estrazione
├── transform.py # Funzioni di trasformazione
├── load.py # Funzioni di caricamento
├── validate.py # Schema di validazione Pandera
├── config.py # Configurazione e costanti
├── data/
│ └── raw/ # Dati grezzi scaricati
└── logs/
└── etl.log # File di log
Questa struttura sembra eccessiva per un progetto piccolo? Forse. Ma quando la pipeline cresce (e cresce sempre), ti ringrazierai da solo.
Fase 1: Extract — Estrazione dei Dati
La fase di estrazione è dove raccogliamo i dati grezzi. In un progetto reale le sorgenti sono quasi sempre multiple: CSV locali, API REST, database remoti, a volte persino fogli Excel condivisi (sì, succede ancora). Vediamo i casi più comuni.
Estrazione da File CSV
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def extract_csv(file_path: str) -> pd.DataFrame:
"""Estrae dati da un file CSV con gestione errori."""
logger.info(f"Estrazione dati da {file_path}")
try:
df = pd.read_csv(
file_path,
dtype_backend="pyarrow", # Backend ottimizzato in pandas 3.0
parse_dates=["order_date"]
)
logger.info(f"Estratte {len(df)} righe da {file_path}")
return df
except FileNotFoundError:
logger.error(f"File non trovato: {file_path}")
raise
except pd.errors.ParserError as e:
logger.error(f"Errore di parsing: {e}")
raise
Nota il parametro dtype_backend="pyarrow": è una delle piccole gemme di pandas 3.0 che fa una differenza concreta sulle performance, specialmente con file grandi.
Estrazione da API REST
import requests
def extract_api(url: str, params: dict = None) -> pd.DataFrame:
"""Estrae dati da un endpoint API REST."""
logger.info(f"Chiamata API: {url}")
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data)
logger.info(f"Estratte {len(df)} righe dall'API")
return df
Estrazione da Database con SQLAlchemy
from sqlalchemy import create_engine, text
def extract_database(connection_string: str, query: str) -> pd.DataFrame:
"""Estrae dati da un database SQL."""
engine = create_engine(connection_string)
with engine.connect() as conn:
df = pd.read_sql(text(query), conn)
logger.info(f"Estratte {len(df)} righe dal database")
engine.dispose()
return df
Gestione di Sorgenti Multiple
Nella pratica, le sorgenti sono quasi sempre più di una. Ecco come orchestrare l'estrazione:
def extract_all() -> dict[str, pd.DataFrame]:
"""Estrae dati da tutte le sorgenti configurate."""
datasets = {}
# Ordini da CSV
datasets["orders"] = extract_csv("data/raw/orders.csv")
# Prodotti da API
datasets["products"] = extract_api(
"https://api.example.com/products",
params={"status": "active"}
)
# Clienti da database
datasets["customers"] = extract_database(
"postgresql://user:pass@localhost:5432/crm",
"SELECT id, name, email, region FROM customers WHERE active = true"
)
return datasets
Fase 2: Transform — Trasformazione e Pulizia
Qui si fa sul serio. La trasformazione è il cuore della pipeline: pulizia, normalizzazione, arricchimento, validazione. Con pandas 3.0, ricorda che il Copy-on-Write è attivo di default — tutte le modifiche vanno fatte con .loc[] o creando nuove colonne tramite .assign().
Pulizia e Normalizzazione
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Trasforma e pulisce il DataFrame degli ordini."""
logger.info("Inizio trasformazione ordini")
# Rimuovi duplicati
df = df.drop_duplicates(subset=["order_id"])
# Gestione valori mancanti
df = df.dropna(subset=["order_id", "customer_id"]) # Campi obbligatori
df = df.fillna({"discount": 0.0, "notes": ""}) # Default per opzionali
# Normalizzazione stringhe (pandas 3.0 usa dtype str nativo)
df = df.assign(
product_name=df["product_name"].str.strip().str.lower(),
customer_email=df["customer_email"].str.strip().str.lower()
)
# Colonne calcolate
df = df.assign(
total_with_tax=df["amount"] * 1.22, # IVA italiana 22%
net_amount=df["amount"] - df["discount"],
order_year=df["order_date"].dt.year,
order_month=df["order_date"].dt.month
)
# Filtra ordini con importo valido
df = df.loc[df["amount"] > 0]
logger.info(f"Trasformazione completata: {len(df)} righe valide")
return df
Un dettaglio che mi piace sottolineare: l'uso di .assign() per creare nuove colonne è perfettamente compatibile con il Copy-on-Write. Niente warning, niente comportamenti strani.
Arricchimento con Join tra DataFrame
def enrich_orders(
orders: pd.DataFrame,
customers: pd.DataFrame,
products: pd.DataFrame
) -> pd.DataFrame:
"""Arricchisce gli ordini con dati clienti e prodotti."""
enriched = (
orders
.merge(customers, on="customer_id", how="left", suffixes=("", "_cust"))
.merge(products, on="product_id", how="left", suffixes=("", "_prod"))
)
# Segnala join non riusciti
missing_customers = enriched["name"].isna().sum()
missing_products = enriched["product_name_prod"].isna().sum()
if missing_customers > 0:
logger.warning(f"{missing_customers} ordini senza cliente corrispondente")
if missing_products > 0:
logger.warning(f"{missing_products} ordini senza prodotto corrispondente")
return enriched
I left join con controllo dei match mancanti sono una di quelle cose che fanno la differenza tra una pipeline "che funziona" e una pipeline affidabile. Quei warning nel log ti salvano ore di debug.
Aggregazioni e Pivot con pandas 3.0
def create_monthly_summary(df: pd.DataFrame) -> pd.DataFrame:
"""Crea un riepilogo mensile delle vendite."""
summary = (
df
.groupby(["order_year", "order_month", "region"], as_index=False)
.agg(
total_revenue=("net_amount", "sum"),
order_count=("order_id", "count"),
avg_order_value=("net_amount", "mean"),
unique_customers=("customer_id", "nunique")
)
)
summary = summary.sort_values(
["order_year", "order_month", "total_revenue"],
ascending=[True, True, False]
)
return summary
Validazione dei Dati con Pandera
Ecco una cosa che ho imparato a mie spese: senza validazione, dati corrotti si propagano silenziosamente lungo tutta la pipeline. Li scopri solo quando il report finale mostra numeri assurdi e il tuo manager ti chiede spiegazioni. Pandera risolve esattamente questo problema, permettendoti di definire schemi di validazione dichiarativi per i DataFrame.
Definire uno Schema di Validazione
import pandera as pa
order_schema = pa.DataFrameSchema(
columns={
"order_id": pa.Column(
int,
checks=pa.Check.gt(0),
unique=True,
description="ID univoco dell'ordine"
),
"customer_id": pa.Column(
int,
checks=pa.Check.gt(0),
nullable=False
),
"amount": pa.Column(
float,
checks=[
pa.Check.gt(0, error="L'importo deve essere positivo"),
pa.Check.lt(1_000_000, error="Importo sospetto: superiore a 1M")
]
),
"discount": pa.Column(
float,
checks=pa.Check.in_range(0, 1),
nullable=True
),
"order_date": pa.Column(
"datetime64[us]", # Microsecondi: default pandas 3.0
nullable=False
),
"customer_email": pa.Column(
str,
checks=pa.Check.str_matches(
r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$",
error="Formato email non valido"
),
nullable=True
),
},
checks=[
pa.Check(
lambda df: df["discount"].fillna(0) <= df["amount"],
error="Lo sconto non può superare l'importo"
)
],
coerce=True
)
Quel check cross-colonna alla fine (sconto ≤ importo) è il tipo di regola business che spesso sfugge. Pandera rende queste validazioni eleganti da scrivere.
Applicare la Validazione nella Pipeline
def validate_data(df: pd.DataFrame, schema: pa.DataFrameSchema) -> pd.DataFrame:
"""Valida un DataFrame e restituisce solo le righe valide."""
try:
validated_df = schema.validate(df, lazy=True)
logger.info("Validazione completata: tutti i controlli superati")
return validated_df
except pa.errors.SchemaErrors as err:
logger.warning(
f"Trovate {len(err.failure_cases)} violazioni di validazione"
)
# Log delle violazioni per analisi
for _, row in err.failure_cases.iterrows():
logger.warning(
f" Colonna: {row['column']}, "
f"Check: {row['check']}, "
f"Indice: {row['index']}"
)
# Restituisci solo le righe valide
invalid_indices = err.failure_cases["index"].unique()
valid_df = df.drop(index=invalid_indices, errors="ignore")
logger.info(
f"Righe valide: {len(valid_df)}/{len(df)} "
f"({len(valid_df)/len(df)*100:.1f}%)"
)
return valid_df
L'opzione lazy=True è fondamentale: raccoglie tutti gli errori invece di fermarsi al primo. Così puoi avere un quadro completo di cosa non va nei dati.
Fase 3: Load — Caricamento nel Database
L'ultima tappa: caricare i dati trasformati e validati nella destinazione. Usiamo SQLAlchemy 2.x con le best practice attuali.
Caricamento in SQLite (Sviluppo)
from sqlalchemy import create_engine
def load_to_sqlite(df: pd.DataFrame, table_name: str, db_path: str) -> int:
"""Carica un DataFrame in un database SQLite."""
engine = create_engine(f"sqlite:///{db_path}")
with engine.begin() as conn:
rows = df.to_sql(
name=table_name,
con=conn,
if_exists="replace",
index=False,
method="multi",
chunksize=500
)
engine.dispose()
logger.info(f"Caricate {len(df)} righe nella tabella '{table_name}'")
return len(df)
Caricamento in PostgreSQL (Produzione)
from sqlalchemy import create_engine, inspect
def load_to_postgres(
df: pd.DataFrame,
table_name: str,
connection_string: str,
if_exists: str = "append"
) -> int:
"""Carica un DataFrame in PostgreSQL con gestione transazionale."""
engine = create_engine(connection_string)
with engine.begin() as conn:
# Verifica se la tabella esiste
inspector = inspect(engine)
table_exists = table_name in inspector.get_table_names()
if not table_exists:
logger.info(f"Creazione tabella '{table_name}'")
df.to_sql(
name=table_name,
con=conn,
if_exists=if_exists,
index=False,
method="multi",
chunksize=1000
)
engine.dispose()
logger.info(f"Caricate {len(df)} righe in '{table_name}'")
return len(df)
Caricamento Incrementale con Upsert
In produzione, quasi mai vuoi rimpiazzare l'intera tabella. Spesso servono solo gli aggiornamenti ai record nuovi o modificati. L'upsert con PostgreSQL è la soluzione:
from sqlalchemy.dialects.postgresql import insert as pg_insert
def upsert_to_postgres(
df: pd.DataFrame,
table_name: str,
engine,
conflict_columns: list[str]
):
"""Esegue un upsert (INSERT ... ON CONFLICT UPDATE)."""
from sqlalchemy import Table, MetaData
metadata = MetaData()
metadata.reflect(bind=engine)
table = metadata.tables[table_name]
records = df.to_dict(orient="records")
with engine.begin() as conn:
for chunk_start in range(0, len(records), 1000):
chunk = records[chunk_start:chunk_start + 1000]
stmt = pg_insert(table).values(chunk)
update_cols = {
c.name: stmt.excluded[c.name]
for c in table.columns
if c.name not in conflict_columns
}
stmt = stmt.on_conflict_do_update(
index_elements=conflict_columns,
set_=update_cols
)
conn.execute(stmt)
logger.info(f"Upsert completato: {len(records)} record processati")
Orchestrare la Pipeline Completa
Bene, abbiamo tutti i pezzi. Assembliamoli in un orchestratore che gestisce logging e errori in modo serio:
import logging
from datetime import datetime
def setup_logging():
"""Configura il logging per la pipeline."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler("logs/etl.log"),
logging.StreamHandler()
]
)
def run_pipeline():
"""Esegue la pipeline ETL completa."""
setup_logging()
logger = logging.getLogger("etl_pipeline")
start_time = datetime.now()
logger.info("=" * 60)
logger.info("INIZIO PIPELINE ETL")
logger.info("=" * 60)
try:
# --- EXTRACT ---
logger.info("FASE 1: Estrazione")
datasets = extract_all()
# --- TRANSFORM ---
logger.info("FASE 2: Trasformazione")
orders = transform_orders(datasets["orders"])
enriched = enrich_orders(
orders, datasets["customers"], datasets["products"]
)
# --- VALIDATE ---
logger.info("FASE 2b: Validazione")
validated = validate_data(enriched, order_schema)
# --- AGGREGATE ---
logger.info("FASE 2c: Aggregazione")
monthly = create_monthly_summary(validated)
# --- LOAD ---
logger.info("FASE 3: Caricamento")
load_to_sqlite(validated, "orders_clean", "data/warehouse.db")
load_to_sqlite(monthly, "monthly_summary", "data/warehouse.db")
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(f"Pipeline completata in {elapsed:.2f} secondi")
logger.info(f"Righe caricate: {len(validated)} ordini, {len(monthly)} righe riepilogo")
except Exception as e:
logger.error(f"Pipeline fallita: {e}", exc_info=True)
raise
if __name__ == "__main__":
run_pipeline()
Onestamente, il pattern try/except globale è il minimo indispensabile. In produzione, potresti voler wrappare ogni singola fase per avere un controllo più granulare — ma per iniziare questo approccio è più che sufficiente.
Ottimizzazione delle Performance
Quando i dataset crescono, le performance contano eccome. Ecco le strategie che uso più spesso.
Elaborazione a Chunk per File Grandi
def extract_large_csv(file_path: str, chunksize: int = 50_000) -> pd.DataFrame:
"""Elabora un CSV di grandi dimensioni a chunk."""
chunks = []
for i, chunk in enumerate(pd.read_csv(file_path, chunksize=chunksize)):
logger.info(f"Elaborazione chunk {i + 1}: {len(chunk)} righe")
# Trasforma ogni chunk singolarmente per risparmiare memoria
transformed = transform_orders(chunk)
chunks.append(transformed)
result = pd.concat(chunks, ignore_index=True)
logger.info(f"Totale righe elaborate: {len(result)}")
return result
Usare dtype Categorici per Risparmiare Memoria
def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""Ottimizza i tipi di dato per ridurre il consumo di memoria."""
mem_before = df.memory_usage(deep=True).sum() / 1024**2
# Converti colonne con pochi valori unici in categoriche
for col in df.select_dtypes(include=["str", "object"]).columns:
if df[col].nunique() / len(df) < 0.5: # Meno del 50% di valori unici
df[col] = df[col].astype("category")
# Downcast numeri interi e float
for col in df.select_dtypes(include=["int64"]).columns:
df[col] = pd.to_numeric(df[col], downcast="integer")
for col in df.select_dtypes(include=["float64"]).columns:
df[col] = pd.to_numeric(df[col], downcast="float")
mem_after = df.memory_usage(deep=True).sum() / 1024**2
logger.info(f"Memoria: {mem_before:.1f} MB -> {mem_after:.1f} MB "
f"(-{(1 - mem_after/mem_before)*100:.0f}%)")
return df
Ho visto riduzioni del 60-70% di memoria con questa tecnica. Sembra un trucchetto, ma su dataset da milioni di righe può fare la differenza tra "funziona" e "out of memory".
Quando Considerare Polars
Se i dataset superano i 5-10 GB, pandas potrebbe non bastare. Polars, scritto in Rust, offre performance da 10 a 50 volte superiori grazie alla lazy evaluation e all'esecuzione parallela su tutti i core. La sintassi è abbastanza simile a pandas:
import polars as pl
# Equivalente Polars di una pipeline di trasformazione
df = (
pl.scan_csv("data/raw/orders.csv") # Lazy: non carica subito in memoria
.filter(pl.col("amount") > 0)
.with_columns(
total_with_tax=pl.col("amount") * 1.22,
order_month=pl.col("order_date").str.to_date().dt.month()
)
.group_by(["order_month", "region"])
.agg(
pl.col("amount").sum().alias("total_revenue"),
pl.col("order_id").count().alias("order_count")
)
.collect() # Esegue tutto il piano ottimizzato
)
Detto questo, per la maggior parte dei progetti, pandas 3.0 con il backend PyArrow è più che sufficiente. Polars ha senso quando arrivi a volumi davvero importanti.
Testing della Pipeline
Una pipeline ETL senza test è una bomba a orologeria. Non è questione di se qualcosa si romperà, ma di quando. Ecco come strutturare i test con pytest:
import pytest
import pandas as pd
from transform import transform_orders
from validate import order_schema
@pytest.fixture
def sample_orders():
"""Crea un DataFrame di esempio per i test."""
return pd.DataFrame({
"order_id": [1, 2, 3, 4],
"customer_id": [101, 102, 103, 104],
"product_name": [" Widget A ", "widget b", " WIDGET C", "widget d"],
"customer_email": ["[email protected]", "[email protected]", "[email protected]", "[email protected]"],
"amount": [100.0, 250.0, -50.0, 75.0],
"discount": [0.0, 10.0, 0.0, 5.0],
"order_date": pd.to_datetime(["2026-01-15", "2026-01-16", "2026-01-17", "2026-01-18"]),
"notes": [None, "Urgente", None, ""]
})
def test_transform_removes_negative_amounts(sample_orders):
result = transform_orders(sample_orders)
assert (result["amount"] > 0).all()
def test_transform_normalizes_product_names(sample_orders):
result = transform_orders(sample_orders)
for name in result["product_name"]:
assert name == name.strip().lower()
def test_transform_calculates_tax(sample_orders):
result = transform_orders(sample_orders)
expected = result["amount"] * 1.22
pd.testing.assert_series_equal(
result["total_with_tax"], expected, check_names=False
)
Best Practice per Pipeline ETL in Produzione
Passare da uno script locale a una pipeline production-ready richiede attenzione su diversi fronti. Ecco le cose che contano di più.
1. Idempotenza
Ogni esecuzione della pipeline deve produrre lo stesso risultato, indipendentemente da quante volte viene lanciata. Usa if_exists='replace' per le tabelle aggregate oppure implementa un upsert per i dati transazionali.
2. Logging Strutturato
Basta con i print() sparsi ovunque. Il modulo logging con livelli appropriati (INFO per il flusso normale, WARNING per anomalie, ERROR per fallimenti) è il minimo. Includi sempre metriche chiave: righe estratte, righe scartate, tempo di esecuzione per fase.
3. Gestione degli Errori Granulare
Ogni fase dovrebbe avere il proprio blocco try/except. Se l'estrazione da una sorgente fallisce, la pipeline deve poter continuare con le altre sorgenti e segnalare l'errore.
4. Configurazione Esternalizzata
Mai, mai hardcodare percorsi file, connection string o parametri nel codice. Variabili d'ambiente o file di configurazione:
import os
DB_CONNECTION = os.environ.get(
"ETL_DB_URL",
"sqlite:///data/warehouse.db"
)
RAW_DATA_PATH = os.environ.get("ETL_RAW_DATA", "data/raw/")
LOG_LEVEL = os.environ.get("ETL_LOG_LEVEL", "INFO")
5. Orchestrazione con Strumenti Professionali
Per pipeline in produzione, strumenti come Apache Airflow, Prefect o Dagster gestiscono schedulazione, monitoraggio e dipendenze tra task. Retry automatici, alerting, UI per il monitoraggio — tutto quello che serve per non impazzire quando qualcosa va storto alle 2 di notte.
Domande Frequenti (FAQ)
Quando usare pandas e quando passare a Polars o PySpark?
pandas va benissimo per dataset che entrano in RAM (indicativamente fino a 2-5 GB). Oltre quella soglia, Polars è eccellente su una singola macchina grazie a lazy evaluation e parallelismo automatico. PySpark serve quando i dati sono distribuiti su un cluster e superano la capacità di un singolo nodo.
Qual è la differenza tra ETL e ELT?
Nell'ETL tradizionale, i dati vengono trasformati prima del caricamento. Nell'ELT (Extract, Load, Transform), i dati grezzi vengono caricati direttamente nel data warehouse e le trasformazioni avvengono dentro il database con SQL. L'ELT è molto comune con warehouse cloud come BigQuery, Snowflake e Redshift, dove la potenza di calcolo per trasformare non manca.
Come si automatizza l'esecuzione di una pipeline ETL?
Per pipeline semplici, un cron job su Linux o il Task Scheduler di Windows bastano. Per pipeline complesse con dipendenze tra task, Apache Airflow permette di definire DAG (Directed Acyclic Graph) che gestiscono ordine di esecuzione, retry e monitoraggio. Prefect e Dagster sono alternative più moderne con una curva di apprendimento meno ripida.
Pandera o Great Expectations: quale scegliere?
Pandera è perfetto per validazioni leggere integrate nel codice, ideale per progetti piccoli e medi. Great Expectations è più adatto a contesti enterprise dove servono Data Docs condivise e integrazione CI/CD. Se usi solo pandas, Pandera è la scelta naturale — meno overhead, più praticità.
Come gestire i valori mancanti durante la trasformazione?
Dipende dal contesto. Per campi obbligatori (come gli ID), rimuovi le righe con dropna(). Per numeri opzionali, riempi con zero o con la mediana usando fillna(). Per le stringhe, un valore vuoto "" è spesso meglio di NaN. In pandas 3.0, le colonne stringa usano il nuovo dtype str con NaN come valore mancante predefinito.