ETL pipeline — Extract, Transform, Load — je naprosto základní kámen moderního datového inženýrství. Pokud jste se někdy ptali, jak data z různých zdrojů skončí čistá a připravená v analytickém nástroji nebo modelu strojového učení, odpovědí je právě ETL. V tomto průvodci se krok za krokem naučíte, jak takovou pipeline postavit v Pythonu — od extrakce dat z CSV a databáze, přes transformaci pomocí Pandas, až po načtení do SQL databáze přes SQLAlchemy.
Upřímně, po létech práce s daty jsem přesvědčen, že dobře postavená ETL pipeline ušetří víc času než jakýkoli jiný nástroj. Takže pojďme na to.
Co je ETL Pipeline?
ETL je zkratka pro tři klíčové fáze zpracování dat:
- Extract (Extrakce) — načtení dat z jednoho nebo více zdrojů (CSV soubory, API, databáze, webové stránky)
- Transform (Transformace) — čištění, filtrování, agregace a obohacování dat tak, aby odpovídala požadovanému formátu
- Load (Načtení) — uložení transformovaných dat do cílového úložiště (databáze, datový sklad, soubor)
ETL pipeline automatizuje celý tento proces a zajišťuje, že data jsou přesně, spolehlivě a opakovatelně přesouvána z jejich zdrojů do míst, kde se z nich stávají analytické pohledy, dashboardy nebo vstupy pro modely strojového učení. Bez ní by každý analytik dělal tu samou manuální práci znovu a znovu — a to si nikdo přát nechce.
Proč Python pro ETL v roce 2026?
Python dominuje světu datového inženýrství, a to z dobrých důvodů. Podle průzkumu JetBrains z roku 2026 používá 77 % datových vědců a inženýrů Pandas jako primární nástroj pro zpracování dat. Ekosystém knihoven pokrývá každou část ETL procesu:
- pandas — transformace a čištění dat v paměti
- SQLAlchemy — připojení k relačním databázím (PostgreSQL, MySQL, SQLite)
- requests / httpx — extrakce dat z REST API
- Apache Airflow / Prefect — orchestrace a plánování pipeline
- DuckDB / Polars — vysokovýkonné zpracování velkých datasetů
Trh nástrojů pro datové pipeline dosáhl v roce 2026 hodnoty 14,76 miliardy dolarů s ročním růstem 26,8 %. To není jen statistika — je to signal, jak zásadní se ETL dovednosti staly pro moderní firmy.
Potřebné knihovny
Nejdřív si připravte prostředí. Nainstalujte všechny závislosti jedním příkazem:
pip install pandas sqlalchemy psycopg2-binary python-dotenv numpy
Důležitá rada: uložte přihlašovací údaje k databázi do souboru .env a načítejte je pomocí python-dotenv — nikdy je nepište přímo do kódu. Vím, že to zní jako banality, ale tahle chyba se opakuje i u zkušených vývojářů.
Fáze 1: Extrakce dat (Extract)
V první fázi načteme data ze dvou zdrojů: CSV souboru a SQL databáze. Nic komplikovaného — ale záleží na detailech.
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
load_dotenv() # načte proměnné z .env souboru
# Extrakce z CSV souboru
sales_df = pd.read_csv("data/daily_sales.csv")
# Extrakce z SQL databáze
DB_URL = os.getenv("DATABASE_URL") # např. postgresql://user:pass@localhost:5432/mydb
engine = create_engine(DB_URL)
query = "SELECT customer_id, region, signup_date FROM customers WHERE active = TRUE"
customers_df = pd.read_sql(query, engine)
print(f"Načteno {len(sales_df)} prodejů a {len(customers_df)} zákazníků.")
Pro velmi velké CSV soubory použijte chunking — data se načítají postupně a nezahltíte paměť. Tohle je jeden z těch triků, které oceníte až ve chvíli, kdy vám skript selže na 3 GB souboru v pátek odpoledne:
# Načítání po částech (chunksize = 50 000 řádků)
chunks = []
for chunk in pd.read_csv("data/large_sales.csv", chunksize=50_000):
# předběžné filtrování ještě před načtením do paměti
chunks.append(chunk[chunk["amount"] > 0])
sales_df = pd.concat(chunks, ignore_index=True)
Pokud pracujete s daty z více zdrojů, čištění dat v Pythonu s Pandas je nezbytným krokem před transformací — pomůže vám eliminovat duplicity, chybějící hodnoty a nekonzistentní formáty hned na začátku.
Fáze 2: Transformace dat (Transform)
Transformace je srdcem každé ETL pipeline. A upřímně řečeno, tady trávíte většinu času. Zahrnuje čištění, normalizaci, slučování datasetů a vytváření odvozených sloupců.
import numpy as np
# Sloučení prodejů se zákazníky
merged_df = sales_df.merge(customers_df, on="customer_id", how="left")
# Čištění: odstranění řádků s chybějícími hodnotami v klíčových sloupcích
merged_df.dropna(subset=["customer_id", "amount", "region"], inplace=True)
# Normalizace datumů
merged_df["sale_date"] = pd.to_datetime(merged_df["sale_date"])
merged_df["signup_date"] = pd.to_datetime(merged_df["signup_date"])
# Odvozené sloupce
merged_df["days_since_signup"] = (merged_df["sale_date"] - merged_df["signup_date"]).dt.days
merged_df["amount_eur"] = merged_df["amount"] * 1.08 # přepočet měny
# Kategorizace zákazníků podle hodnoty nákupů
merged_df["customer_tier"] = pd.cut(
merged_df["amount"],
bins=[0, 500, 2000, np.inf],
labels=["standard", "premium", "vip"]
)
# Agregace: celkové tržby podle regionu a měsíce
monthly_sales = (
merged_df
.groupby([merged_df["sale_date"].dt.to_period("M"), "region"])
["amount_eur"]
.sum()
.reset_index()
.rename(columns={"sale_date": "month", "amount_eur": "total_revenue_eur"})
)
print(monthly_sales.head(10))
Při složitějších transformacích rozložte kód do pojmenovaných funkcí — pipeline tak zůstane čitelná a snadno testovatelná (a váš kolega vám poděkuje za tři měsíce):
def clean_sales(df: pd.DataFrame) -> pd.DataFrame:
df = df.dropna(subset=["customer_id", "amount"])
df["sale_date"] = pd.to_datetime(df["sale_date"])
df = df[df["amount"] > 0]
return df.reset_index(drop=True)
def enrich_sales(df: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
df = df.merge(customers, on="customer_id", how="left")
df["days_since_signup"] = (df["sale_date"] - pd.to_datetime(df["signup_date"])).dt.days
return df
cleaned = clean_sales(sales_df)
enriched = enrich_sales(cleaned, customers_df)
Fáze 3: Načtení dat (Load)
V poslední fázi uložíme transformovaná data do cílové databáze. SQLAlchemy metoda to_sql() toto zvládá elegantně a bez zbytečné ceremonie:
from sqlalchemy import create_engine, text
TARGET_DB = os.getenv("TARGET_DATABASE_URL")
target_engine = create_engine(TARGET_DB)
# Načtení agregovaných dat do nové tabulky
monthly_sales.to_sql(
name="monthly_sales_summary",
con=target_engine,
if_exists="append", # "replace" přepíše, "append" přidá
index=False,
chunksize=5_000 # dávkové vkládání pro velké objemy
)
print("Data úspěšně načtena do databáze.")
Pro produkční prostředí přidejte transakční zpracování — při chybě se data neukládají neúplně. Tohle vám jednou zachrání noc:
from sqlalchemy.exc import SQLAlchemyError
with target_engine.begin() as conn: # automatická transakce
try:
monthly_sales.to_sql("monthly_sales_summary", con=conn,
if_exists="append", index=False)
print("Transakce úspěšně dokončena.")
except SQLAlchemyError as e:
print(f"Chyba při načítání dat: {e}")
raise # transakce se automaticky vrátí (rollback)
Kompletní ETL Pipeline v jednom skriptu
Tak, a teď to celé spojíme dohromady. Všechny tři fáze v jedné strukturované pipeline s logováním — přesně tak, jak byste to nasadili do produkce:
import logging
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from dotenv import load_dotenv
import os
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
load_dotenv()
def extract(source_engine) -> tuple[pd.DataFrame, pd.DataFrame]:
logger.info("Zahajuji extrakci dat...")
sales = pd.read_csv("data/daily_sales.csv")
customers = pd.read_sql("SELECT customer_id, region, signup_date FROM customers", source_engine)
logger.info(f"Extrahováno: {len(sales)} prodejů, {len(customers)} zákazníků")
return sales, customers
def transform(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
logger.info("Zahajuji transformaci dat...")
df = sales.merge(customers, on="customer_id", how="left")
df.dropna(subset=["customer_id", "amount", "region"], inplace=True)
df["sale_date"] = pd.to_datetime(df["sale_date"])
df["signup_date"] = pd.to_datetime(df["signup_date"])
df["days_since_signup"] = (df["sale_date"] - df["signup_date"]).dt.days
df["amount_eur"] = df["amount"] * 1.08
logger.info(f"Po transformaci: {len(df)} záznamů")
return df
def load(df: pd.DataFrame, target_engine) -> None:
logger.info("Zahajuji načítání do cílové databáze...")
with target_engine.begin() as conn:
df.to_sql("enriched_sales", con=conn, if_exists="append",
index=False, chunksize=5_000)
logger.info("Načítání dokončeno.")
def run_pipeline():
source_engine = create_engine(os.getenv("SOURCE_DB_URL"))
target_engine = create_engine(os.getenv("TARGET_DB_URL"))
try:
sales, customers = extract(source_engine)
enriched = transform(sales, customers)
load(enriched, target_engine)
logger.info("ETL pipeline úspěšně dokončena.")
except Exception as e:
logger.error(f"Pipeline selhala: {e}")
raise
if __name__ == "__main__":
run_pipeline()
Nejlepší praktiky pro ETL pipeline v roce 2026
1. Inkrementální načítání
Místo každodenního přepisu celé tabulky načítejte pouze nová nebo změněná data pomocí vodoznaku (watermark). Zní to jako detail, ale na větších tabulkách vám to ušetří hodiny i peníze za cloud:
from sqlalchemy import text
# Zjisti poslední datum v cílové tabulce
with target_engine.connect() as conn:
result = conn.execute(text("SELECT MAX(sale_date) FROM enriched_sales"))
last_date = result.scalar()
# Načti jen nová data ze zdroje pomocí parametrizovaného dotazu
with source_engine.connect() as conn:
new_sales = pd.read_sql(
text("SELECT * FROM sales WHERE sale_date > :last_date"),
conn,
params={"last_date": last_date}
)
2. Validace dat
Před načtením vždy ověřte integritu dat. Tiché chyby (záporné tržby, chybějící ID) se jinak projevují až v analytických reportech — nejlépe v pátek odpoledne:
def validate(df: pd.DataFrame) -> bool:
assert df["amount_eur"].gt(0).all(), "Záporné tržby!"
assert df["customer_id"].notna().all(), "Chybí customer_id!"
assert len(df) > 0, "Prázdný dataset!"
return True
3. Výkon: DuckDB a Polars pro velké datasety
Pro datasety nad 1 GB zvažte výkonnější alternativy k Pandas. DuckDB v Pythonu 2026 dokáže zpracovat SQL dotazy přímo nad DataFrames nebo Parquet soubory s 10–100× vyšším výkonem. Detailní srovnání pak najdete v článku Polars vs Pandas v roce 2026 — spoiler: pro ETL nad stovkami milionů řádků Polars vyhrává na celé čáře.
4. Orchestrace a plánování
Pro produkční nasazení automatizujte spouštění pipeline. Máte tři solidní možnosti:
- Apache Airflow — nejrozšířenější orchestrátor s grafickým rozhraním a bohatým ekosystémem
- Prefect — modernější alternativa s jednoduchou Python API
- cron — pro jednoduché naplánované skripty bez závislostí
5. Zabezpečení a prevence SQL injection
Nikdy nevkládejte uživatelský vstup přímo do SQL dotazů. Parametrizované dotazy SQLAlchemy jsou jedinou správnou cestou:
from sqlalchemy import text
# Správně — parametrizovaný dotaz
with source_engine.connect() as conn:
result = conn.execute(
text("SELECT * FROM orders WHERE region = :region"),
{"region": user_input}
)
Často kladené otázky
Co je ETL pipeline a proč je důležitá?
ETL pipeline je automatizovaný proces, který extrahuje data ze zdrojů, transformuje je do požadovaného formátu a načítá do cílového úložiště. Je klíčová pro spolehlivé dodávání čistých, konzistentních dat pro analytiku, reportování a modely strojového učení.
Jaký je rozdíl mezi ETL a ELT?
V ETL se data transformují na dedikovaném serveru před načtením do datového skladu. V ELT se data nejprve načtou surová do skladu a transformují se až tam. ELT je modernější přístup využívající výkon cloudových datových skladů jako BigQuery nebo Snowflake.
Jaké knihovny jsou nejlepší pro ETL pipeline v Pythonu?
Pro většinu projektů postačí kombinace pandas (transformace), SQLAlchemy (připojení k databázi) a python-dotenv (správa přihlašovacích údajů). Pro velké datasety zvažte DuckDB nebo Polars, pro orchestraci pak Apache Airflow nebo Prefect.
Jak zpracovat velké CSV soubory v ETL pipeline bez přetečení paměti?
Použijte parametr chunksize v pd.read_csv() k načítání dat po částech. Každý chunk zpracujete zvlášť a výsledky sloučíte pomocí pd.concat(). Alternativně načítejte data přímo do DuckDB nebo Polars, které jsou optimalizovány pro práci mimo paměť.
Jak zabránit duplicitám při opakovaném spouštění ETL pipeline?
Implementujte inkrementální načítání pomocí vodoznaku (watermark) — ukládejte poslední zpracované datum nebo ID a při každém běhu načítejte pouze nová data. Alternativně použijte INSERT OR IGNORE nebo ON CONFLICT DO NOTHING v SQL pro idempotentní vkládání.