Introduzione: Perché la Pulizia dei Dati È il Passo Più Importante
Se hai mai lavorato con dati reali, lo sai già: i dati sporchi sono la norma, non l'eccezione. Colonne con formati incoerenti, valori mancanti sparsi ovunque, duplicati che spuntano nei momenti meno opportuni, outlier che distorcono le analisi. Secondo uno studio di Anaconda del 2024, i data scientist dedicano circa il 45% del loro tempo alla preparazione e alla pulizia dei dati. È tantissimo, ma ha perfettamente senso — un modello di machine learning, per quanto sofisticato, non può produrre risultati affidabili se viene alimentato con dati di scarsa qualità. Il principio è brutalmente semplice: garbage in, garbage out.
In questa guida esploreremo l'intero ecosistema della pulizia dei dati in Python. Si parte dalle operazioni fondamentali con pandas, si passa per le tecniche avanzate di gestione dei valori mancanti e degli outlier, e si arriva alla costruzione di pipeline automatizzate con pipe() e PyJanitor. Chiuderemo con la validazione usando Pandera e Great Expectations, perché — diciamocelo — pulire i dati senza verificare che il risultato sia corretto è come lavare i piatti al buio.
La guida è pensata per chi ha già familiarità con le basi di Python e pandas. Se hai letto la nostra guida sulle serie temporali, troverai qui concetti complementari che ti aiuteranno a gestire meglio i dati prima di qualsiasi analisi avanzata.
Esplorare i Dati Prima di Pulirli
Prima regola della pulizia dei dati: non pulire nulla finché non hai capito cosa hai davanti.
L'esplorazione iniziale è fondamentale per identificare i problemi e definire una strategia di intervento. Saltare questo passaggio? Ricetta per il disastro, fidati.
Strumenti di ispezione rapida con pandas
pandas offre diversi metodi per ottenere una panoramica rapida del dataset. Ecco quelli che dovresti usare sempre, senza eccezioni:
import pandas as pd
import numpy as np
# Caricare un dataset di esempio
df = pd.read_csv('vendite_ecommerce.csv')
# Panoramica generale: dimensioni, tipi, valori non nulli
print(df.shape)
print(df.info())
# Statistiche descrittive per colonne numeriche
print(df.describe())
# Statistiche per colonne categoriche
print(df.describe(include=['object']))
# Prime e ultime righe
print(df.head(10))
print(df.tail(10))
# Conteggio dei valori mancanti per colonna
missing = df.isnull().sum()
missing_pct = (missing / len(df)) * 100
missing_report = pd.DataFrame({
'mancanti': missing,
'percentuale': missing_pct.round(2)
}).sort_values('percentuale', ascending=False)
print(missing_report[missing_report['mancanti'] > 0])
Il metodo .info() è particolarmente prezioso perché ti mostra i tipi di dato di ogni colonna. Se vedi una colonna che dovrebbe essere numerica ma risulta come object, hai già trovato un problema. Stesso discorso per le date memorizzate come stringhe — sono una fonte costante di grattacapi nelle analisi temporali.
Profilazione automatica con ydata-profiling
Per dataset di dimensioni moderate, la libreria ydata-profiling (il successore di pandas-profiling) genera un report HTML completo con un singolo comando. Onestamente, è uno di quegli strumenti che una volta provato non riesci più a farne a meno.
# pip install ydata-profiling
from ydata_profiling import ProfileReport
profile = ProfileReport(df, title="Report Esplorativo Vendite",
explorative=True)
profile.to_file("report_vendite.html")
Il report include distribuzioni di ogni variabile, correlazioni, avvisi sui valori mancanti, duplicati e molto altro. È un ottimo punto di partenza, specialmente quando lavori con dataset che non conosci bene.
Gestione dei Valori Mancanti
I valori mancanti sono probabilmente il problema più comune nella pulizia dei dati. Circa il 20% dei dataset del mondo reale contiene informazioni incomplete, e la strategia che scegli per gestirli può influenzare parecchio i risultati delle tue analisi.
Identificare i pattern dei valori mancanti
Prima di decidere come trattare i valori mancanti, è fondamentale capire perché mancano. I valori possono mancare in modo completamente casuale (MCAR), casuale condizionato (MAR), o non casuale (MNAR). E no, questa distinzione non è puramente accademica — guida direttamente la scelta della strategia di imputazione.
import matplotlib.pyplot as plt
import seaborn as sns
# Visualizzare i pattern dei valori mancanti
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Heatmap dei valori mancanti
sns.heatmap(df.isnull(), cbar=True, yticklabels=False,
cmap='viridis', ax=axes[0])
axes[0].set_title('Pattern dei Valori Mancanti')
# Percentuale di valori mancanti per colonna
missing_pct = (df.isnull().sum() / len(df)) * 100
missing_pct = missing_pct[missing_pct > 0].sort_values(ascending=True)
missing_pct.plot(kind='barh', ax=axes[1], color='coral')
axes[1].set_title('% Valori Mancanti per Colonna')
axes[1].set_xlabel('Percentuale (%)')
plt.tight_layout()
plt.savefig('missing_analysis.png', dpi=150)
plt.show()
La libreria missingno offre visualizzazioni ancora più specializzate. La matrice di correlazione dei valori mancanti, ad esempio, rivela se la mancanza di dati in una colonna è correlata con la mancanza in un'altra — un'informazione che può cambiare completamente il tuo approccio.
# pip install missingno
import missingno as msno
# Matrice dei valori mancanti
msno.matrix(df)
plt.show()
# Dendrogramma: raggruppa le colonne per pattern di mancanza
msno.dendrogram(df)
plt.show()
Strategie di imputazione: dal semplice al sofisticato
Una volta identificati i pattern, è il momento di scegliere la strategia più appropriata. Vediamo un'escalation dalla più semplice alla più avanzata.
Eliminazione: la soluzione più semplice, ma anche la più rischiosa. Funziona solo quando i valori mancanti sono pochi e distribuiti casualmente.
# Eliminare righe con qualsiasi valore mancante
df_clean = df.dropna()
# Eliminare righe dove mancano valori in colonne specifiche
df_clean = df.dropna(subset=['prezzo', 'quantita'])
# Eliminare colonne con più del 50% di valori mancanti
threshold = len(df) * 0.5
df_clean = df.dropna(axis=1, thresh=threshold)
Imputazione con valori statistici: qui si sostituiscono i valori mancanti con media, mediana o moda. La mediana è generalmente preferibile alla media perché è robusta agli outlier (un dettaglio che fa la differenza più spesso di quanto si pensi).
# Imputazione con la mediana per colonne numeriche
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
# Imputazione con la moda per colonne categoriche
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
df[col] = df[col].fillna(df[col].mode()[0])
# Forward fill e backward fill per dati sequenziali
df['temperatura'] = df['temperatura'].ffill()
df['umidita'] = df['umidita'].bfill()
# Interpolazione lineare per dati con trend
df['sensore'] = df['sensore'].interpolate(method='linear')
Imputazione avanzata con scikit-learn: per dataset complessi, gli algoritmi di imputazione multivariata producono risultati nettamente migliori perché tengono conto delle relazioni tra le variabili. È qui che le cose si fanno interessanti.
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
# KNN Imputer: usa i k vicini più prossimi
knn_imputer = KNNImputer(n_neighbors=5, weights='distance')
df_numeric = df.select_dtypes(include=[np.number])
df_imputed = pd.DataFrame(
knn_imputer.fit_transform(df_numeric),
columns=df_numeric.columns,
index=df_numeric.index
)
# Iterative Imputer (MICE): imputazione multivariata
mice_imputer = IterativeImputer(
max_iter=10,
random_state=42,
sample_posterior=False
)
df_mice = pd.DataFrame(
mice_imputer.fit_transform(df_numeric),
columns=df_numeric.columns,
index=df_numeric.index
)
L'IterativeImputer implementa l'approccio MICE (Multiple Imputation by Chained Equations), che modella ogni variabile con valori mancanti come funzione delle altre variabili. È considerato lo stato dell'arte per l'imputazione dei dati tabulari e nel 2025-2026 è diventato una pratica standard in molti workflow professionali.
Rimozione dei Duplicati
I duplicati possono sembrare un problema banale. Non lo sono.
Non tutti i duplicati sono errori: a volte rappresentano transazioni legittime che si ripetono. La chiave sta nel distinguere tra duplicati veri (errori di inserimento) e duplicati apparenti (dati perfettamente validi). Ho visto colleghi eliminare intere giornate di ordini legittimi perché non avevano fatto questa distinzione — un errore costoso.
Identificare e rimuovere duplicati
# Contare i duplicati esatti (tutte le colonne identiche)
print(f"Duplicati esatti: {df.duplicated().sum()}")
# Duplicati basati su colonne specifiche
print(f"Duplicati per cliente-data: "
f"{df.duplicated(subset=['cliente_id', 'data_ordine']).sum()}")
# Visualizzare i duplicati prima di rimuoverli
duplicates = df[df.duplicated(subset=['cliente_id', 'data_ordine'],
keep=False)]
print(duplicates.sort_values(['cliente_id', 'data_ordine']))
# Rimuovere i duplicati, tenendo la prima occorrenza
df_dedup = df.drop_duplicates(
subset=['cliente_id', 'data_ordine', 'prodotto_id'],
keep='first'
)
print(f"Righe prima: {len(df)}, dopo: {len(df_dedup)}")
Fuzzy matching per duplicati quasi identici
Nel mondo reale, i duplicati spesso non sono esatti. "Mario Rossi", "mario rossi" e "M. Rossi" potrebbero benissimo essere la stessa persona. Per gestire questi casi si ricorre al fuzzy matching.
# pip install thefuzz python-Levenshtein
from thefuzz import fuzz, process
# Trovare nomi simili
nomi = df['nome_cliente'].unique()
soglia = 85 # percentuale di somiglianza
duplicati_fuzzy = []
for i, nome1 in enumerate(nomi):
for nome2 in nomi[i+1:]:
ratio = fuzz.ratio(nome1.lower(), nome2.lower())
if ratio >= soglia:
duplicati_fuzzy.append((nome1, nome2, ratio))
df_fuzzy = pd.DataFrame(duplicati_fuzzy,
columns=['nome_1', 'nome_2', 'somiglianza'])
print(df_fuzzy.sort_values('somiglianza', ascending=False))
Per dataset molto grandi, il confronto a coppie diventa computazionalmente proibitivo. In questi casi, librerie come recordlinkage o dedupe offrono algoritmi di blocking che riducono drasticamente il numero di confronti necessari. Vale la pena esplorarle se lavori regolarmente con anagrafiche di grandi dimensioni.
Standardizzazione e Conversione dei Tipi di Dato
Secondo diverse ricerche, oltre il 30% degli errori nei dati deriva da incoerenze nei formati. Non sorprende, a pensarci bene. Vediamo le tecniche essenziali per riportare ordine nel caos.
Pulizia delle colonne di testo
# Standardizzare i nomi delle colonne
df.columns = (df.columns
.str.strip()
.str.lower()
.str.replace(' ', '_')
.str.replace(r'[^\w]', '_', regex=True)
)
# Pulizia del testo nelle colonne
df['citta'] = (df['citta']
.str.strip()
.str.title()
.str.replace(r'\s+', ' ', regex=True)
)
# Normalizzare le categorie
mapping_regioni = {
'lombardia': 'Lombardia',
'lomb.': 'Lombardia',
'LOMBARDIA': 'Lombardia',
'Lomb': 'Lombardia'
}
df['regione'] = df['regione'].str.strip().str.lower().map(
lambda x: mapping_regioni.get(x, x.title() if isinstance(x, str) else x)
)
# Rimuovere caratteri speciali dai numeri di telefono
df['telefono'] = df['telefono'].str.replace(r'[^\d+]', '', regex=True)
Conversione dei tipi di dato
I tipi di dato errati sono una delle fonti più subdole di bug. Una colonna di prezzi memorizzata come stringa non ti darà errore immediatamente, ma produrrà risultati completamente sbagliati quando proverai a calcolare una somma o una media. E il bello è che potresti non accorgertene per giorni.
# Convertire prezzi da stringa a numerico
# Prima rimuovere simboli di valuta e separatori
df['prezzo'] = (df['prezzo']
.str.replace('€', '')
.str.replace('.', '') # separatore migliaia italiano
.str.replace(',', '.') # separatore decimale italiano
.str.strip()
)
df['prezzo'] = pd.to_numeric(df['prezzo'], errors='coerce')
# Convertire date in formato datetime
df['data_ordine'] = pd.to_datetime(
df['data_ordine'],
format='%d/%m/%Y',
dayfirst=True,
errors='coerce'
)
# Verificare le conversioni fallite
failed_conversions = df[df['prezzo'].isna()]
if len(failed_conversions) > 0:
print(f"Attenzione: {len(failed_conversions)} prezzi non convertiti")
# Usare i tipi nullable di pandas per mantenere i NaN
# nei dati interi (novità molto utile di pandas 2.x)
df['quantita'] = df['quantita'].astype('Int64') # nota la I maiuscola
df['codice_cap'] = df['codice_cap'].astype('string')
Un consiglio che vale oro: usa i tipi Int64 (con la I maiuscola) e string introdotti in pandas 2.x. A differenza dei tipi NumPy tradizionali, supportano nativamente i valori mancanti senza convertire gli interi in float. Addio al classico problema di vedere 1.0 dove dovrebbe esserci 1.
Rilevamento e Gestione degli Outlier
Gli outlier sono valori che si discostano significativamente dal resto dei dati. Possono essere errori di misurazione, errori di inserimento, oppure valori genuinamente estremi. La vera sfida è capire quale dei tre casi hai davanti.
Metodi statistici per identificare gli outlier
# Metodo IQR (Interquartile Range)
def detect_outliers_iqr(series, factor=1.5):
"""Identifica gli outlier usando il metodo IQR."""
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - factor * IQR
upper_bound = Q3 + factor * IQR
return (series < lower_bound) | (series > upper_bound)
# Metodo Z-Score
def detect_outliers_zscore(series, threshold=3):
"""Identifica gli outlier usando lo Z-score."""
z_scores = np.abs((series - series.mean()) / series.std())
return z_scores > threshold
# Applicare i metodi
outliers_iqr = detect_outliers_iqr(df['prezzo'])
outliers_zscore = detect_outliers_zscore(df['prezzo'])
print(f"Outlier IQR: {outliers_iqr.sum()}")
print(f"Outlier Z-Score: {outliers_zscore.sum()}")
# Visualizzare gli outlier
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
df['prezzo'].plot(kind='box', ax=axes[0])
axes[0].set_title('Boxplot - Prezzi')
df['prezzo'].plot(kind='hist', bins=50, ax=axes[1], edgecolor='black')
axes[1].axvline(df['prezzo'].quantile(0.75) + 1.5 *
(df['prezzo'].quantile(0.75) - df['prezzo'].quantile(0.25)),
color='red', linestyle='--', label='Soglia IQR')
axes[1].set_title('Distribuzione Prezzi')
axes[1].legend()
plt.tight_layout()
plt.show()
Strategie di gestione degli outlier
Una volta identificati, gli outlier possono essere gestiti in diversi modi. La scelta dipende dal contesto e dall'obiettivo dell'analisi — non esiste una risposta universale.
# Opzione 1: Rimozione (solo se sono chiaramente errori)
df_no_outliers = df[~detect_outliers_iqr(df['prezzo'])]
# Opzione 2: Capping (Winsorizing) - limitare ai percentili
lower = df['prezzo'].quantile(0.01)
upper = df['prezzo'].quantile(0.99)
df['prezzo_capped'] = df['prezzo'].clip(lower=lower, upper=upper)
# Opzione 3: Trasformazione logaritmica
# Utile quando la distribuzione è fortemente asimmetrica
df['prezzo_log'] = np.log1p(df['prezzo'])
# Opzione 4: Sostituire con NaN e imputare successivamente
mask = detect_outliers_iqr(df['prezzo'], factor=3)
df.loc[mask, 'prezzo'] = np.nan
df['prezzo'] = df['prezzo'].fillna(df['prezzo'].median())
Un punto importante: non rimuovere mai gli outlier automaticamente senza prima investigare. Un prezzo di vendita di 50.000€ potrebbe sembrare assurdo per un negozio di abbigliamento, ma essere perfettamente legittimo per una gioielleria. Il contesto è tutto.
Costruire Pipeline di Pulizia Automatizzate
Fin qui abbiamo visto singole operazioni di pulizia. Nel lavoro quotidiano, però, hai bisogno di combinare queste operazioni in pipeline riutilizzabili e manutenibili. Ed è qui che il metodo pipe() di pandas e la libreria PyJanitor entrano in gioco.
Il metodo pipe() di pandas
Il metodo pipe() consente di concatenare funzioni in modo fluido e leggibile. Invece di scrivere codice annidato come f(g(h(df))), puoi scrivere df.pipe(h).pipe(g).pipe(f). Il vantaggio non è solo estetico — una pipeline lineare è molto più facile da debuggare, testare e modificare.
def standardize_columns(df):
"""Standardizza i nomi delle colonne."""
df = df.copy()
df.columns = (df.columns
.str.strip()
.str.lower()
.str.replace(r'[^\w]', '_', regex=True)
)
return df
def remove_empty_rows(df, threshold=0.5):
"""Rimuove le righe con più del threshold% di valori mancanti."""
min_non_null = int(len(df.columns) * (1 - threshold))
return df.dropna(thresh=min_non_null)
def fix_data_types(df, date_cols=None, numeric_cols=None):
"""Converte le colonne nei tipi corretti."""
df = df.copy()
if date_cols:
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors='coerce',
dayfirst=True)
if numeric_cols:
for col in numeric_cols:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
def remove_duplicates(df, subset=None):
"""Rimuove i duplicati, tenendo la prima occorrenza."""
return df.drop_duplicates(subset=subset, keep='first')
def cap_outliers(df, columns, factor=1.5):
"""Applica il capping agli outlier usando il metodo IQR."""
df = df.copy()
for col in columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - factor * IQR
upper = Q3 + factor * IQR
df[col] = df[col].clip(lower=lower, upper=upper)
return df
# La pipeline completa — elegante e leggibile
df_clean = (
pd.read_csv('vendite.csv')
.pipe(standardize_columns)
.pipe(remove_empty_rows, threshold=0.5)
.pipe(fix_data_types,
date_cols=['data_ordine'],
numeric_cols=['prezzo', 'quantita'])
.pipe(remove_duplicates,
subset=['cliente_id', 'data_ordine', 'prodotto_id'])
.pipe(cap_outliers, columns=['prezzo'], factor=1.5)
)
print(f"Dataset pulito: {df_clean.shape}")
La bellezza di questo approccio è che ogni funzione è indipendente e testabile. Puoi aggiungere, rimuovere o riordinare i passaggi senza toccare il resto della pipeline. E se qualcosa va storto, basta eseguire i passaggi uno alla volta per trovare il problema — semplicissimo.
PyJanitor: pulizia dei dati con method chaining avanzato
PyJanitor estende pandas con un'API di method chaining ispirata al pacchetto dplyr di R. Aggiunge decine di metodi al DataFrame di pandas che rendono le operazioni di pulizia più espressive e concise.
# pip install pyjanitor
import janitor
# Pipeline con PyJanitor — notare quanto è leggibile
df_clean = (
pd.read_csv('vendite.csv')
.clean_names() # nomi colonne snake_case
.remove_empty() # rimuove righe/colonne vuote
.rename_column('old_name', 'new_name') # rinomina colonne
.filter_string('categoria',
search_string='Elettronica',
complement=False) # filtra per stringa
.transform_column('prezzo',
lambda x: x.clip(lower=0)) # niente prezzi negativi
.encode_categorical(['regione']) # encoding categorico
.sort_on(['data_ordine']) # ordinamento
)
PyJanitor brilla particolarmente quando hai bisogno di operazioni comuni che in pandas richiederebbero diverse righe di codice. Il metodo clean_names(), ad esempio, standardizza automaticamente tutti i nomi delle colonne in formato snake_case, rimuovendo spazi, caratteri speciali e accenti. Un'operazione che altrimenti ti costerebbe una catena di .str operations.
Creare una classe riutilizzabile per la pulizia
Per progetti di produzione, incapsulare la logica di pulizia in una classe dedicata può fare una grande differenza. Test, versionamento, riutilizzo — tutto diventa più semplice.
class DataCleaner:
"""Pipeline di pulizia dati configurabile e riutilizzabile."""
def __init__(self, config=None):
self.config = config or {}
self.log = []
def _log_step(self, step_name, df_before, df_after):
"""Registra le modifiche di ogni passaggio."""
rows_removed = len(df_before) - len(df_after)
self.log.append({
'step': step_name,
'rows_before': len(df_before),
'rows_after': len(df_after),
'rows_removed': rows_removed
})
def standardize_columns(self, df):
df_out = df.copy()
df_out.columns = (df_out.columns
.str.strip()
.str.lower()
.str.replace(r'[^\w]', '_', regex=True))
self._log_step('standardize_columns', df, df_out)
return df_out
def handle_missing(self, df, strategy='median'):
df_out = df.copy()
numeric_cols = df_out.select_dtypes(include=[np.number]).columns
cat_cols = df_out.select_dtypes(include=['object']).columns
if strategy == 'median':
df_out[numeric_cols] = df_out[numeric_cols].fillna(
df_out[numeric_cols].median())
elif strategy == 'mean':
df_out[numeric_cols] = df_out[numeric_cols].fillna(
df_out[numeric_cols].mean())
elif strategy == 'drop':
df_out = df_out.dropna(subset=numeric_cols)
df_out[cat_cols] = df_out[cat_cols].fillna('Sconosciuto')
self._log_step('handle_missing', df, df_out)
return df_out
def remove_duplicates(self, df, subset=None):
df_out = df.drop_duplicates(subset=subset, keep='first')
self._log_step('remove_duplicates', df, df_out)
return df_out
def run(self, df):
"""Esegue l'intera pipeline di pulizia."""
return (df
.pipe(self.standardize_columns)
.pipe(self.handle_missing,
strategy=self.config.get('missing_strategy', 'median'))
.pipe(self.remove_duplicates,
subset=self.config.get('dedup_columns')))
def get_report(self):
"""Restituisce un report di tutte le operazioni eseguite."""
return pd.DataFrame(self.log)
# Utilizzo
cleaner = DataCleaner(config={
'missing_strategy': 'median',
'dedup_columns': ['cliente_id', 'data_ordine']
})
df_clean = cleaner.run(df)
print(cleaner.get_report())
Validazione dei Dati: Garantire la Qualità
Pulire i dati senza validarli è come fare una ristrutturazione senza il collaudo finale. La validazione garantisce che i dati rispettino le regole di business e i vincoli attesi — e nel 2025-2026 questo aspetto è diventato sempre più centrale con l'adozione crescente di framework dedicati.
Validazione con Pandera
Pandera è una libreria di validazione statistica che porta il concetto di schema e type hints ai DataFrame di pandas. Ti permette di definire aspettative precise su ogni colonna e di verificarle automaticamente. Se non la conosci ancora, preparati a chiederti come facevi senza.
# pip install pandera
import pandera as pa
from pandera import Column, Check, DataFrameSchema
# Definire lo schema del DataFrame
schema = DataFrameSchema({
"cliente_id": Column(
int, Check.gt(0),
nullable=False,
description="ID univoco del cliente"
),
"data_ordine": Column(
"datetime64[ns]",
Check.in_range(
pd.Timestamp("2020-01-01"),
pd.Timestamp("2026-12-31")
),
nullable=False
),
"prezzo": Column(
float,
[
Check.gt(0, error="Il prezzo deve essere positivo"),
Check.lt(100000, error="Prezzo sospettamente alto")
],
nullable=False
),
"quantita": Column(
int,
Check.in_range(1, 10000),
nullable=False
),
"email": Column(
str,
Check.str_matches(
r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
),
nullable=True
),
"regione": Column(
str,
Check.isin([
"Lombardia", "Lazio", "Campania", "Sicilia",
"Veneto", "Piemonte", "Emilia-Romagna", "Toscana",
"Puglia", "Calabria"
]),
nullable=False
)
})
# Validare il DataFrame
try:
validated_df = schema.validate(df_clean, lazy=True)
print("Validazione superata!")
except pa.errors.SchemaErrors as err:
print(f"Errori di validazione trovati: {len(err.failure_cases)}")
print(err.failure_cases.head(20))
L'opzione lazy=True è particolarmente utile: invece di fermarsi al primo errore, raccoglie tutti i problemi e li presenta insieme. Così puoi correggere tutto in un solo passaggio.
Great Expectations: validazione su scala enterprise
Per pipeline di dati in produzione, Great Expectations (GX) offre un framework più completo che include validazione, documentazione automatica e integrazione con i principali orchestratori come Airflow e Prefect.
# pip install great_expectations
import great_expectations as gx
# Creare un contesto e una sorgente dati
context = gx.get_context()
# Aggiungere il DataFrame come asset
data_source = context.data_sources.add_pandas("pandas_source")
data_asset = data_source.add_dataframe_asset(name="vendite")
batch_definition = data_asset.add_batch_definition_whole_dataframe(
"batch_vendite"
)
# Creare una suite di aspettative
suite = context.suites.add(
gx.ExpectationSuite(name="vendite_validation")
)
# Definire le aspettative
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="cliente_id"
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="prezzo", min_value=0, max_value=100000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(
column="email"
)
)
# Creare e eseguire la validazione
validation_definition = context.validation_definitions.add(
gx.ValidationDefinition(
name="vendite_check",
data=batch_definition,
suite=suite,
)
)
# Eseguire la validazione
batch_parameters = {"dataframe": df_clean}
result = validation_definition.run(
batch_parameters=batch_parameters
)
print(f"Validazione superata: {result.success}")
Great Expectations genera anche una documentazione interattiva (chiamata "Data Docs") che può essere condivisa con il team e gli stakeholder. In contesti aziendali dove la trasparenza sulla qualità dei dati è fondamentale, questo è un vantaggio enorme.
Caso Pratico: Pipeline End-to-End per Dati E-commerce
Bene, mettiamo insieme tutto quello che abbiamo visto finora in un progetto concreto. Supponiamo di ricevere un export CSV giornaliero dal sistema di e-commerce della nostra azienda e di dover preparare i dati per il team di analisi.
import pandas as pd
import numpy as np
import janitor
import pandera as pa
from pandera import Column, Check, DataFrameSchema
from datetime import datetime
import logging
# Configurazione logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EcommercePipeline:
"""Pipeline di pulizia per dati e-commerce."""
SCHEMA = DataFrameSchema({
"cliente_id": Column(int, Check.gt(0), nullable=False),
"data_ordine": Column("datetime64[ns]", nullable=False),
"prodotto": Column(str, Check.str_length(min_value=1),
nullable=False),
"categoria": Column(str, nullable=False),
"prezzo": Column(float, Check.in_range(0.01, 50000),
nullable=False),
"quantita": Column(int, Check.in_range(1, 1000),
nullable=False),
"citta": Column(str, nullable=False),
})
def __init__(self, input_path):
self.input_path = input_path
self.stats = {}
def extract(self):
"""Fase di estrazione: carica i dati grezzi."""
logger.info(f"Caricamento dati da {self.input_path}")
df = pd.read_csv(self.input_path, encoding='utf-8')
self.stats['righe_originali'] = len(df)
logger.info(f"Caricate {len(df)} righe")
return df
def transform(self, df):
"""Fase di trasformazione: pulizia completa."""
df_clean = (
df
.pipe(self._standardize_columns)
.pipe(self._fix_types)
.pipe(self._clean_text)
.pipe(self._handle_missing)
.pipe(self._remove_duplicates)
.pipe(self._handle_outliers)
.pipe(self._add_features)
)
self.stats['righe_pulite'] = len(df_clean)
return df_clean
def validate(self, df):
"""Fase di validazione: verifica lo schema."""
try:
validated = self.SCHEMA.validate(df, lazy=True)
logger.info("Validazione superata con successo")
return validated
except pa.errors.SchemaErrors as e:
logger.error(f"Validazione fallita: "
f"{len(e.failure_cases)} errori")
logger.error(e.failure_cases.head())
raise
def _standardize_columns(self, df):
return df.clean_names()
def _fix_types(self, df):
df = df.copy()
df['data_ordine'] = pd.to_datetime(
df['data_ordine'], dayfirst=True, errors='coerce')
df['prezzo'] = pd.to_numeric(
df['prezzo'].astype(str)
.str.replace('€', '')
.str.replace(',', '.'),
errors='coerce'
)
df['quantita'] = pd.to_numeric(
df['quantita'], errors='coerce').astype('Int64')
return df
def _clean_text(self, df):
df = df.copy()
text_cols = df.select_dtypes(include=['object']).columns
for col in text_cols:
df[col] = (df[col]
.str.strip()
.str.replace(r'\s+', ' ', regex=True))
if 'citta' in df.columns:
df['citta'] = df['citta'].str.title()
if 'categoria' in df.columns:
df['categoria'] = df['categoria'].str.title()
return df
def _handle_missing(self, df):
df = df.copy()
df = df.dropna(subset=['cliente_id', 'data_ordine'])
df['prezzo'] = df['prezzo'].fillna(df['prezzo'].median())
df['quantita'] = df['quantita'].fillna(1)
text_cols = df.select_dtypes(include=['object']).columns
df[text_cols] = df[text_cols].fillna('Non specificato')
return df
def _remove_duplicates(self, df):
before = len(df)
df = df.drop_duplicates(
subset=['cliente_id', 'data_ordine', 'prodotto'],
keep='first'
)
removed = before - len(df)
if removed > 0:
logger.info(f"Rimossi {removed} duplicati")
return df
def _handle_outliers(self, df):
df = df.copy()
Q1 = df['prezzo'].quantile(0.25)
Q3 = df['prezzo'].quantile(0.75)
IQR = Q3 - Q1
df['prezzo'] = df['prezzo'].clip(
lower=max(0.01, Q1 - 3 * IQR),
upper=Q3 + 3 * IQR
)
return df
def _add_features(self, df):
df = df.copy()
df['totale'] = df['prezzo'] * df['quantita']
df['giorno_settimana'] = df['data_ordine'].dt.day_name()
df['mese'] = df['data_ordine'].dt.month
df['anno'] = df['data_ordine'].dt.year
return df
def run(self):
"""Esegue l'intera pipeline ETL."""
logger.info("=== Inizio Pipeline ===")
start = datetime.now()
df = self.extract()
df = self.transform(df)
df = self.validate(df)
elapsed = (datetime.now() - start).total_seconds()
logger.info(f"=== Pipeline completata in {elapsed:.2f}s ===")
logger.info(f"Righe: {self.stats['righe_originali']} → "
f"{self.stats['righe_pulite']}")
return df
# Esecuzione
pipeline = EcommercePipeline('vendite_export_2026.csv')
df_final = pipeline.run()
# Salvare il risultato
df_final.to_parquet('vendite_pulite.parquet', index=False)
df_final.to_csv('vendite_pulite.csv', index=False)
Questa pipeline è un esempio realistico che puoi adattare ai tuoi dati. Nota come ogni metodo ha una responsabilità ben definita, il logging traccia ogni operazione e la validazione finale garantisce che l'output rispetti lo schema atteso. È il tipo di struttura che ti ringrazierai di aver costruito quando il CSV giornaliero arriverà con un formato diverso dal solito (e succederà, credimi).
Best Practice e Consigli Pratici
Dopo tanti progetti di data cleaning, ecco le lezioni più importanti che ho raccolto — alcune imparate nel modo più doloroso possibile.
1. Salva sempre una copia dei dati grezzi
Prima di qualsiasi modifica, fai una copia del dataset originale. Non importa quanto sei sicuro delle tue trasformazioni: potresti aver bisogno di tornare ai dati grezzi. Una buona pratica è aggiungere il suffisso -RAW al file originale e mantenere dati grezzi e puliti in directory separate.
2. Documenta ogni decisione
Ogni scelta nella pulizia dei dati è una decisione analitica. Perché hai scelto di imputare con la mediana invece della media? Perché hai rimosso le righe con più del 50% di valori mancanti e non il 30%? Documenta tutto. Tra tre mesi non ricorderai il motivo, garantito.
3. Testa le funzioni di pulizia
Le funzioni di pulizia sono codice come qualsiasi altro, e meritano test unitari. Usa pytest per verificare che ogni funzione si comporti correttamente con input noti.
import pytest
def test_standardize_columns():
df = pd.DataFrame({'Nome Cognome': [1], 'Data Ordine': [2]})
result = standardize_columns(df)
assert list(result.columns) == ['nome_cognome', 'data___ordine']
def test_cap_outliers():
df = pd.DataFrame({'valore': [1, 2, 3, 4, 100]})
result = cap_outliers(df, columns=['valore'], factor=1.5)
assert result['valore'].max() < 100
def test_remove_duplicates():
df = pd.DataFrame({
'id': [1, 1, 2],
'valore': ['a', 'a', 'b']
})
result = remove_duplicates(df, subset=['id', 'valore'])
assert len(result) == 2
4. Monitora le prestazioni con dataset grandi
Con dataset di milioni di righe, le prestazioni diventano critiche. Ecco alcune strategie per tenere sotto controllo memoria e tempi di esecuzione.
# Specificare i tipi di dato al caricamento per ridurre la memoria
dtypes = {
'cliente_id': 'int32',
'prezzo': 'float32',
'quantita': 'int16',
'categoria': 'category'
}
df = pd.read_csv('grande_dataset.csv', dtype=dtypes)
# Usare il tipo 'category' per colonne con pochi valori unici
for col in ['regione', 'categoria', 'stato_ordine']:
df[col] = df[col].astype('category')
# Confronto memoria
print(f"Memoria prima: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
Se i tuoi dataset superano regolarmente i 10 milioni di righe, considera seriamente l'uso di Polars al posto di pandas. I benchmark recenti mostrano che Polars può superare pandas di 3-10x nei carichi di lavoro ETL, utilizzando circa l'87% in meno di memoria grazie al formato colonnare Apache Arrow e all'esecuzione parallela multi-core.
5. Adotta un approccio incrementale
Non cercare di costruire la pipeline perfetta al primo tentativo. Inizia con le operazioni essenziali — rimozione duplicati, gestione mancanti, conversione tipi — poi aggiungi complessità man mano che scopri nuovi problemi nei dati. La pulizia dei dati è un processo iterativo, non un evento singolo.
Conclusioni
La pulizia dei dati non è il passaggio più affascinante della data science, ma è senza dubbio il più importante. Come abbiamo visto, Python offre un ecosistema maturo e completo per affrontare ogni aspetto della preparazione dei dati: dalle operazioni basilari con pandas alle pipeline eleganti con pipe() e PyJanitor, dalla validazione rigorosa con Pandera e Great Expectations fino all'ottimizzazione delle prestazioni per dataset su larga scala.
I punti chiave da portare a casa:
- Esplora prima di pulire — comprendi i tuoi dati prima di modificarli
- Scegli la strategia giusta per i valori mancanti — dalla semplice eliminazione all'imputazione MICE, ogni situazione richiede un approccio diverso
- Automatizza con le pipeline — il metodo
pipe()e PyJanitor rendono il codice più leggibile e manutenibile - Valida sempre l'output — Pandera e Great Expectations sono i tuoi alleati per garantire la qualità
- Documenta e testa — le decisioni di pulizia sono decisioni analitiche che meritano trasparenza
Se hai seguito la nostra precedente guida sulle serie temporali, ora disponi di un toolkit completo: sai come pulire i dati grezzi e come analizzare i dati temporali una volta che sono pronti. Il prossimo passo? Mettere in pratica queste tecniche sui tuoi dati reali. Buona pulizia!