데이터 파이프라인을 손으로 일일이 돌리던 시절은 진짜 끝났습니다. Apache Airflow 3.0은 2026년 현재 데이터 엔지니어링 분야에서 사실상 표준이 된 워크플로우 오케스트레이션 도구로, Netflix, Airbnb, Spotify 같은 빅테크 기업은 물론 국내 스타트업까지 ETL 파이프라인 백본으로 쓰고 있죠.
저도 처음 Airflow를 도입할 때는 "그냥 cron으로 돌리면 안 되나?" 싶었는데요, 태스크가 50개를 넘어가는 순간 그 생각은 깨끗하게 사라졌습니다. 솔직히 말하면, 한 번이라도 새벽 3시에 의존성 꼬인 cron 잡 디버깅을 해본 사람이라면 Airflow가 왜 필요한지 바로 이해할 겁니다.
이 가이드는 Airflow를 처음 만지는 분부터 이미 운영 중인 분까지 모두를 위한 실전 튜토리얼입니다. 설치부터 DAG 설계, TaskFlow API, 데이터 웨어하우스 연동, 운영 자동화까지 — 실제 코드와 함께 단계별로 풀어봅니다.
Apache Airflow란 무엇인가?
Apache Airflow는 워크플로우를 코드로 정의하고 스케줄링·모니터링하는 오픈소스 플랫폼입니다. 2014년 Airbnb 내부 도구로 시작됐다가, 지금은 Apache Software Foundation의 톱레벨 프로젝트로 운영되고 있어요.
핵심 철학은 "Workflows as Code". 파이프라인을 Python 코드로 작성하니 버전 관리, 코드 리뷰, 단위 테스트가 모두 가능하고, 조건 분기나 동적 태스크 생성 같은 복잡한 로직도 자연스럽게 표현됩니다. (이게 GUI 기반 ETL 도구들과 결정적으로 갈리는 지점이죠.)
Airflow를 사용해야 하는 이유
- 스케줄링과 의존성 관리: cron 표현식 또는 Data-aware 스케줄링으로 수백 개의 태스크 의존성을 시각적으로 관리
- 풍부한 통합: BigQuery, Snowflake, Postgres, Spark, dbt, Kubernetes 등 1,500개 이상의 Provider 지원
- 웹 UI: DAG 실행 상태, 로그, 메트릭을 실시간으로 모니터링 — 솔직히 이거 한 번 익숙해지면 못 빠져나옵니다
- 확장성: Celery, Kubernetes Executor로 수천 개의 태스크를 병렬 처리
- 커뮤니티: GitHub 35,000+ 스타, 활발한 한국어 커뮤니티 보유
Apache Airflow 3.0의 주요 변경 사항
2025년에 나온 Airflow 3.0은 7년 만의 메이저 업데이트입니다. 단순한 버전업이 아니라 아키텍처가 꽤 많이 바뀌었어요.
1. 서비스 지향 아키텍처 (Edge Executor)
3.0의 가장 큰 변화는 Worker가 Metadata DB에 직접 접근하지 않는 새로운 아키텍처입니다. 모든 통신이 새로운 Task Execution API를 통해 이뤄지죠. 그 결과 멀티 클라우드, 멀티 테넌시 환경에서 보안과 격리가 크게 개선됐습니다.
2. DAG 버전 관리
예전에는 DAG 파일을 수정하면 과거 실행 기록 그래프가 망가지는 골치 아픈 문제가 있었어요. (이거 때문에 git revert 하기 무서웠죠.) 3.0에서는 DAG 버저닝이 기본 제공되어, 각 실행이 사용한 DAG 정의를 그대로 보관합니다.
3. Asset 기반 스케줄링
이전 버전의 Dataset이 Asset으로 개편되며 한층 강력해졌습니다. 데이터 산출물을 중심으로 파이프라인을 정의하면, Airflow가 의존성을 자동으로 추적하고 다운스트림 DAG를 트리거합니다.
4. 새로운 React UI
FastAPI 기반의 새로운 백엔드와 React UI가 도입돼 응답 속도가 눈에 띄게 빨라졌습니다. Grid View와 Graph View도 통합돼서 사용성이 좋아졌어요.
5. Python 3.13 지원
Airflow 3.0은 Python 3.9부터 3.13까지 지원하며, Free-threading(GIL 비활성화) 모드와도 호환됩니다.
Apache Airflow 3.0 설치하기
Airflow는 의존성이 꽤 복잡합니다. 그래서 공식적으로 권장하는 제약 파일(constraints file)과 함께 설치하는 게 안전해요. 그냥 pip install apache-airflow만 했다가 의존성 지옥에 빠진 분들 많이 봤습니다.
로컬 설치 (pip)
# 1. 가상 환경 생성
python3.12 -m venv airflow-venv
source airflow-venv/bin/activate
# 2. AIRFLOW_HOME 환경 변수 설정
export AIRFLOW_HOME=~/airflow
# 3. 제약 파일 URL 정의
AIRFLOW_VERSION=3.0.0
PYTHON_VERSION="$(python --version | cut -d ' ' -f 2 | cut -d '.' -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# 4. Airflow 설치
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# 5. 데이터베이스 초기화
airflow db migrate
# 6. 관리자 계정 생성
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected]
# 7. 스케줄러와 웹서버 실행
airflow standalone
설치가 끝나면 브라우저에서 http://localhost:8080에 접속해 로그인할 수 있습니다.
Docker Compose로 빠르게 시작하기
실무 환경과 비슷한 구성으로 시작하고 싶다면 공식 Docker Compose 파일이 가장 편합니다. 제 경우엔 로컬 개발도 거의 다 Docker Compose로 띄우는데, 이게 운영 환경 디버깅하기에도 훨씬 수월하더라고요.
# 1. Docker Compose YAML 다운로드
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/3.0.0/docker-compose.yaml'
# 2. 필수 디렉토리 생성
mkdir -p ./dags ./logs ./plugins ./config
# 3. UID 환경 변수 설정 (Linux)
echo -e "AIRFLOW_UID=$(id -u)" > .env
# 4. 데이터베이스 초기화
docker compose up airflow-init
# 5. 모든 서비스 시작
docker compose up -d
# 6. 상태 확인
docker compose ps
핵심 개념: DAG, Task, Operator
Airflow를 제대로 쓰려면 세 가지 핵심 개념을 알아야 합니다. 처음에는 좀 헷갈릴 수 있는데, 한 번 잡히면 그다음부터는 모든 게 자연스러워져요.
DAG (Directed Acyclic Graph)
DAG는 순환하지 않는 방향 그래프입니다. 태스크들이 어떤 순서와 의존성으로 실행돼야 하는지 정의하죠. DAG 자체는 작업이 무엇을 하는지가 아니라 어떻게 연결되는지를 표현합니다.
Task
Task는 DAG의 노드이며 실제로 수행되는 작업 단위예요. 데이터 추출, 변환, 적재, 알림 전송 등이 모두 Task가 될 수 있습니다.
Operator
Operator는 Task를 만드는 템플릿입니다. 종류는 대략 이렇게 나뉘어요.
- Action Operators:
PythonOperator,BashOperator,SQLExecuteQueryOperator— 실제 작업 수행 - Transfer Operators:
S3ToRedshiftOperator,GCSToBigQueryOperator— 시스템 간 데이터 이동 - Sensor Operators:
FileSensor,ExternalTaskSensor— 조건이 충족될 때까지 대기
첫 번째 DAG 작성하기
자, 그럼 가장 단순한 DAG부터 만들어 보죠. dags/ 디렉토리에 my_first_dag.py 파일을 생성합니다.
from datetime import datetime, timedelta
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
def print_hello():
print("안녕하세요, Airflow 3.0!")
return "Hello task completed"
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="my_first_dag",
default_args=default_args,
description="첫 번째 Airflow 3.0 DAG",
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["tutorial", "beginner"],
) as dag:
task_1 = BashOperator(
task_id="print_date",
bash_command="date",
)
task_2 = PythonOperator(
task_id="say_hello",
python_callable=print_hello,
)
task_3 = BashOperator(
task_id="finish",
bash_command="echo 'DAG 실행 완료!'",
)
task_1 >> task_2 >> task_3
파일을 저장하면 몇 초 후에 Airflow UI에 my_first_dag가 나타납니다. 토글을 켜고 Trigger DAG 버튼을 누르면 바로 실행돼요.
코드 설명
schedule="@daily": 매일 자정에 실행 (cron 표현식"0 0 * * *"도 가능)catchup=False: 과거 누락된 실행을 자동으로 채우지 않음 (이거 True로 두면 진짜 큰일 납니다)>>연산자: 태스크 간 의존성을 정의 (task_1이 끝나야 task_2 시작)retries=2: 실패 시 최대 2번 재시도
TaskFlow API: 더 파이썬다운 방식
Airflow 2.0부터 도입된 TaskFlow API는 데코레이터 기반의 더 직관적인 작성 방식입니다. 3.0에서는 새로운 airflow.sdk 모듈을 통해 더 강력해졌어요. 솔직히 새 프로젝트라면 그냥 처음부터 TaskFlow로 가는 걸 추천합니다.
from datetime import datetime
from airflow.sdk import dag, task
@dag(
dag_id="taskflow_etl_example",
schedule="@hourly",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "taskflow"],
)
def etl_pipeline():
@task
def extract() -> dict:
# API에서 데이터 추출
import requests
response = requests.get("https://api.example.com/sales")
return response.json()
@task
def transform(raw_data: dict) -> list:
# 데이터 변환
records = []
for item in raw_data["data"]:
records.append({
"id": item["order_id"],
"amount": float(item["price"]) * int(item["quantity"]),
"date": item["created_at"][:10],
})
return records
@task
def load(records: list) -> None:
# PostgreSQL에 적재
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="warehouse_db")
hook.insert_rows(
table="fact_sales",
rows=[(r["id"], r["amount"], r["date"]) for r in records],
target_fields=["order_id", "amount", "order_date"],
)
raw = extract()
cleaned = transform(raw)
load(cleaned)
etl_pipeline()
TaskFlow API의 장점은 명확합니다.
- 함수 반환값이 자동으로 다음 태스크에 전달 (XCom 자동 처리)
- 타입 힌트로 IDE 자동완성과 정적 분석 지원
- 의존성을 함수 호출 순서로 자연스럽게 표현
- 일반 Python 함수처럼 단위 테스트 가능
실전 ETL 파이프라인: 매출 데이터 분석
이제 실무에서 자주 만나는 시나리오를 직접 구현해 보겠습니다. 매일 자정에 외부 API에서 매출 데이터를 가져와 정제한 후 BigQuery에 적재하고 Slack으로 결과를 알리는 파이프라인이에요. 이거 한 번 만들어 두면 비슷한 패턴은 거의 다 응용해서 쓸 수 있습니다.
from datetime import datetime, timedelta
from airflow.sdk import dag, task, Asset
from airflow.providers.google.cloud.transfers.local_to_gcs import (
LocalFilesystemToGCSOperator,
)
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
)
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
import pandas as pd
SALES_ASSET = Asset("gs://my-bucket/sales/{{ ds }}/sales.parquet")
@dag(
dag_id="sales_etl_v3",
schedule="0 0 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={
"owner": "analytics-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=1),
},
tags=["sales", "etl", "production"],
)
def sales_etl_pipeline():
@task
def extract_sales(execution_date: str) -> str:
# API에서 일일 매출 데이터 추출
import requests
url = f"https://api.company.com/sales?date={execution_date}"
response = requests.get(url, timeout=30)
response.raise_for_status()
df = pd.DataFrame(response.json()["records"])
local_path = f"/tmp/sales_{execution_date}.parquet"
df.to_parquet(local_path, compression="snappy")
return local_path
@task
def validate_data(file_path: str) -> str:
# 데이터 품질 검증
df = pd.read_parquet(file_path)
assert len(df) > 0, "빈 데이터셋"
assert df["amount"].notna().all(), "amount 컬럼에 결측값 존재"
assert (df["amount"] >= 0).all(), "음수 매출 감지"
print(f"검증 통과: {len(df):,}건, 총 매출 {df['amount'].sum():,.0f}원")
return file_path
@task
def transform_sales(file_path: str) -> str:
# 집계 및 변환
df = pd.read_parquet(file_path)
df["amount_krw"] = df["amount"].round(0)
df["category"] = df["product_id"].str[:3]
df["is_premium"] = df["amount_krw"] >= 100_000
out_path = file_path.replace(".parquet", "_transformed.parquet")
df.to_parquet(out_path, compression="snappy")
return out_path
upload_to_gcs = LocalFilesystemToGCSOperator(
task_id="upload_to_gcs",
src="{{ ti.xcom_pull(task_ids='transform_sales') }}",
dst="sales/{{ ds }}/sales.parquet",
bucket="my-data-bucket",
)
load_to_bq = BigQueryInsertJobOperator(
task_id="load_to_bigquery",
configuration={
"load": {
"sourceUris": ["gs://my-data-bucket/sales/{{ ds }}/sales.parquet"],
"destinationTable": {
"projectId": "my-project",
"datasetId": "analytics",
"tableId": "fact_sales",
},
"sourceFormat": "PARQUET",
"writeDisposition": "WRITE_APPEND",
}
},
outlets=[SALES_ASSET],
)
notify = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_default",
message=":white_check_mark: 매출 ETL 완료 - {{ ds }}",
)
raw_file = extract_sales("{{ ds }}")
validated = validate_data(raw_file)
transformed = transform_sales(validated)
transformed >> upload_to_gcs >> load_to_bq >> notify
sales_etl_pipeline()
이 파이프라인은 실무에서 흔히 마주치는 다음 패턴들을 거의 다 담고 있습니다.
- Jinja 템플릿:
{{ ds }}로 실행 날짜를 동적으로 주입 - 데이터 검증: 적재 전에 품질 체크로 잘못된 데이터 차단 (이 단계 빼먹으면 나중에 정말 후회합니다)
- Asset 통합:
outlets로 산출물을 등록해 다운스트림 DAG 자동 트리거 - 알림: 성공·실패 시 Slack으로 즉시 통보
고급 기능: Sensors와 Branching
Sensor: 외부 조건 감지
Sensor는 특정 조건이 충족될 때까지 대기하는 특수한 Operator입니다. 예를 들면 S3에 파일이 도착할 때까지 기다리거나, 다른 DAG가 끝나기를 기다리는 식이죠.
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_file = S3KeySensor(
task_id="wait_for_input_file",
bucket_name="raw-data-bucket",
bucket_key="incoming/{{ ds }}/data.csv",
aws_conn_id="aws_default",
poke_interval=60, # 60초마다 확인
timeout=60 * 60 * 2, # 2시간 후 타임아웃
mode="reschedule", # worker 슬롯 점유 안 함
)
mode="reschedule"는 Airflow 3.0에서 권장되는 모드입니다. 대기하는 동안 Worker 슬롯을 비워두기 때문에 리소스를 훨씬 효율적으로 쓸 수 있어요. (poke 모드 그대로 두면 Sensor 몇 개만으로도 워커가 마비되더라고요. 경험담입니다.)
Branching: 조건부 실행
from airflow.sdk import task
@task.branch
def choose_path(**context) -> str:
file_size = context["ti"].xcom_pull(task_ids="extract_sales")
if file_size > 1_000_000_000: # 1GB 초과
return "process_large_file"
return "process_small_file"
운영 모범 사례 (Best Practices)
1. DAG는 가볍게 유지하라
DAG 파일은 스케줄러가 주기적으로 파싱합니다. 그래서 무거운 import나 API 호출이 모듈 레벨에 들어가 있으면 전체 시스템이 느려져요. 무거운 작업은 반드시 Task 안에서 수행하세요. 이거 어기면 가장 먼저 티가 납니다.
2. Idempotency를 보장하라
같은 날짜로 여러 번 실행해도 결과가 동일하도록 설계하세요. INSERT 대신 UPSERT, append 대신 partition overwrite를 쓰는 게 좋습니다. 멱등성 없는 파이프라인은 한 번이라도 재실행하는 순간 중복 데이터로 골치 아파져요.
3. Connection과 Variable을 활용하라
API 키나 DB 비밀번호 같은 민감 정보는 코드에 하드코딩하지 마세요. 반드시 Airflow의 Connection 또는 Variable로 관리하고, Secrets Backend(AWS Secrets Manager, GCP Secret Manager)와 통합하면 더 안전합니다.
4. Pool로 동시성 제어
외부 API 호출이나 DB 부하를 제한하려면 Pool을 쓰세요. 예를 들어 BigQuery 쿼리를 동시에 5개로 제한할 수 있습니다.
load_to_bq = BigQueryInsertJobOperator(
task_id="load_to_bigquery",
pool="bigquery_pool",
pool_slots=1,
...
)
5. 단위 테스트를 작성하라
TaskFlow API의 함수는 그냥 평범한 Python 함수예요. 그래서 pytest로 직접 테스트할 수 있습니다.
def test_transform_sales(tmp_path):
df = pd.DataFrame({"amount": [50000, 150000], "product_id": ["ABC123", "XYZ789"]})
input_path = tmp_path / "input.parquet"
df.to_parquet(input_path)
output_path = transform_sales.function(str(input_path))
result = pd.read_parquet(output_path)
assert "is_premium" in result.columns
assert result["is_premium"].tolist() == [False, True]
Airflow vs Prefect vs Dagster 비교
2026년 현재 워크플로우 오케스트레이션 시장에는 여러 강력한 도구가 경쟁 중입니다. 어떤 게 좋을지 한 번쯤은 고민해 볼 만한 주제죠.
Apache Airflow 3.0
- 장점: 가장 큰 커뮤니티, 1,500+ 통합, 검증된 안정성, 풍부한 채용 시장
- 단점: 학습 곡선이 가파름, DAG 정의가 다소 보일러플레이트 같음
- 적합: 대규모 조직, 복잡한 의존성, 다양한 시스템 통합 필요
Prefect 3.0
- 장점: 더 파이썬다운 API, 동적 워크플로우 우수, 빠른 시작
- 단점: 작은 생태계, 일부 기능은 유료(Cloud)
- 적합: 중소 규모 팀, 빠른 프로토타이핑
Dagster
- 장점: Asset/Software-defined 자산 중심 모델, 강력한 타입 시스템
- 단점: 패러다임 전환이 필요, 통합이 Airflow보다 적음
- 적합: 데이터 자산 중심 사고가 자연스러운 분석 조직
국내 기업 환경에서는 채용과 운영 노하우 축적 관점에서 여전히 Airflow가 가장 안전한 선택입니다. 솔직히 신규 멤버 합류시 학습 자료 찾기 쉬운 게 운영 측면에서 굉장히 큰 장점이거든요.
자주 묻는 질문 (FAQ)
Q1. Airflow 2.x에서 3.0으로 마이그레이션하려면 어떻게 해야 하나요?
Airflow 3.0은 메이저 버전 업그레이드라 호환성을 깨는 변경이 좀 있습니다. 일단 공식 가이드의 airflow db check-migrations를 먼저 실행하고, deprecated된 airflow.operators.* 임포트를 airflow.providers.*로 모두 교체해야 해요. 운영 환경 마이그레이션 전에는 반드시 스테이징에서 전체 DAG를 실행해 검증하세요. 이 단계 건너뛰면 후회합니다.
Q2. DAG가 UI에 보이지 않을 때 어떻게 디버깅하나요?
먼저 airflow dags list 명령으로 파싱 여부를 확인하세요. 안 보인다면 python /path/to/dag.py로 직접 실행해 import 오류부터 확인합니다. airflow dags list-import-errors로 모든 파싱 에러를 한 번에 볼 수도 있어요. 그리고 dag_dir_list_interval 설정값을 너무 크게 두면 새 DAG 인식이 늦어질 수 있으니 주의하세요.
Q3. 태스크 간에 큰 데이터를 전달하려면 어떻게 해야 하나요?
XCom은 기본적으로 메타데이터 DB에 저장되니까 작은 값(권장 1MB 이하)만 전달해야 합니다. 큰 데이터는 GCS, S3, 데이터 웨어하우스에 저장하고 경로 또는 ID만 XCom으로 전달하세요. 또는 Airflow 3.0의 Custom XCom Backend를 구성해 자동으로 외부 스토리지에 저장하도록 설정할 수도 있습니다.
Q4. Airflow를 production에서 운영할 때 가장 중요한 설정은 무엇인가요?
다음 다섯 가지를 우선순위로 점검하세요. (1) Executor 선택 — 단일 노드면 LocalExecutor, 분산이 필요하면 Celery/Kubernetes Executor. (2) Metadata DB는 PostgreSQL 권장 (SQLite는 어디까지나 개발용입니다). (3) 로그 백엔드를 S3나 GCS로 설정해 영구 보관. (4) parallelism, dag_concurrency 같은 동시성 파라미터를 워커 사양에 맞게 튜닝. (5) Sentry, Prometheus, StatsD 등으로 모니터링 통합.
Q5. Airflow와 dbt는 어떻게 함께 사용하나요?
dbt는 SQL 변환에 특화돼 있고 Airflow는 전체 워크플로우 오케스트레이션을 담당하므로 정말 잘 어울립니다. 보통은 BashOperator로 dbt run을 실행하거나, 더 정교한 통합이 필요하면 Cosmos(Astronomer의 오픈소스)를 사용해 dbt 모델 각각을 Airflow Task로 자동 변환할 수 있어요. 이렇게 하면 dbt 모델 단위로 재시도와 모니터링이 가능해지죠.
다음 단계
이 가이드로 Apache Airflow 3.0의 핵심을 익혔다면, 이제 다음 주제들로 넘어가 보세요.
- Kubernetes Executor: 태스크별 격리된 환경에서 실행
- Custom Operator 작성: 회사 내부 시스템에 맞춘 재사용 가능한 컴포넌트
- Asset 기반 데이터 메시 구축: 팀 간 데이터 의존성을 명시적으로 관리
- OpenLineage 통합: 데이터 리니지(혈통)를 자동으로 추적
- Airflow on AWS MWAA / GCP Composer: 매니지드 서비스로 운영 부담 경감
Airflow는 단순한 도구가 아닙니다. 데이터 엔지니어링의 공통어에 가까워요. 이 가이드의 패턴들을 실전 프로젝트에 적용하면서 자신만의 운영 노하우를 쌓아가시길 바랍니다. 첫 DAG가 새벽에 무사히 돌아가는 걸 보는 그 순간, 진짜 짜릿하거든요.