Apache Airflow란?

2025. 7. 29. 15:03·Data Engineer
728x90

"매일 오전 9시에 데이터를 추출해서 변환하고, 문제가 있으면 알림을 보내고, 성공하면 리포트를 생성해서"

 

이런 반복적인 작업을 수동으로 처리하고 계신가요? 개발자라면 누구나 한 번쯤은 복잡한 배치 작업들을 자동화하고 싶어했을 것입니다. Apache Airflow가 바로 이런 고민을 해결해주는 강력한 솔루션입니다.

 

Apache Airflow란?

Apache Airflow는 워크플로우를 프로그래밍 방식으로 작성, 스케줄링, 모니터링할 수 있게 해주는 오픈소스 플랫폼입니다.

 

2014년 Airbnb에서 복잡한 데이터 파이프라인을 관리하게 위해 개발되었고, 2016년 Apache Software Foundation에 기부되어 현재는 데이터 엔지니어링 분야의 사실상 표준으로 자리잡았습니다.

 

왜 Airflow가 필요할까?

기존의 cron 작업이나 단순한 스크립트 실행과 비교해보겠습니다.

 

전통적인 방식의 문제점:

  • 작업 간 의존성 관리가 어려움
  • 실패 시 재시도 로직 구현 복잡
  • 모니터링과 로깅이 분산되어 관리 어려움
  • 복잡한 조건부 실행 구현 어려움

Airflow의 해결책:

  • 직관적인 의존성 그래프로 작업 흐름 정의
  • 내장된 재시도, 알림, 로깅 시스템
  • 통합된 웹 UI로 모든 작업 모니터링
  • 파이썬 코드로 복잡한 로직 구현 가능

Airflow의 핵심 개념들

1. DAG (Directed Acyclic Graph)

DAG는 Airflow의 심장과도 같은 개념입니다. 방향성이 있고 순환하지 않는 그래프로, 작업들 간의 실행 순서와 의존성을 정의합니다.

from datetime import datetime, timedelta
from airflow import DAG

# 기본 설정
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# DAG 정의
dag = DAG(
    'my_first_dag',  # DAG 이름
    default_args=default_args,
    description='첫 번째 Airflow DAG',
    schedule_interval='@daily',  # 매일 실행
    catchup=False  # 과거 날짜 소급 실행 안함
)

 

2. Task와 Operator

Task는 DAG내에서 실행되는 개별 작업 단위이고, Operator는 특정 유형의 작업을 수행하는 클래스입니다.

 

주요 Operator들:

  • BashOperator: 셸 명령어 실행
from airflow.operators.bash import BashOperator

backup_task = BashOperator(
    task_id='backup_database',
    bash_command='pg_dump mydb > /backup/mydb_$(date +%Y%m%d).sql',
    dag=dag
)
  • PythonOperator: 파이썬 함수 실행
from airflow.operators.python import PythonOperator

def send_report():
    print("리포트 생성 중...")
    # 실제 리포트 생성 로직
    return "리포트 전송 완료"

report_task = PythonOperator(
    task_id='send_daily_report',
    python_callable=send_report,
    dag=dag
)
  • EmailOperator: 이메일 발송
from airflow.operators.email import EmailOperator

email_task = EmailOperator(
    task_id='send_notification',
    to=['team@company.com'],
    subject='작업 완료 알림',
    html_content='<p>일일 배치 작업이 완료되었습니다.</p>',
    dag=dag
)

 

3. Task 의존성 설정

Airflow의 가장 강력한 기능 중 하나는 직관적인 의존성 설정입니다.

# 순차 실행: A → B → C
task_a >> task_b >> task_c

# 병렬 실행 후 합치기: A,B,C → D
[task_a, task_b, task_c] >> task_d

# 복잡한 분기
start_task >> [extract_users, extract_orders]
extract_users >> process_users >> generate_user_report
extract_orders >> process_orders >> generate_order_report
[generate_user_report, generate_order_report] >> send_final_report

 

실제 사용 사례들

1. 데이터 파이프라인

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import pandas as pd

def extract_sales_data(**context):
    """매출 데이터 추출"""
    execution_date = context['execution_date'].strftime('%Y-%m-%d')
    
    # 데이터베이스에서 데이터 추출
    query = f"""
    SELECT product_id, sales_amount, sales_date 
    FROM sales 
    WHERE sales_date = '{execution_date}'
    """
    
    # 실제 구현에서는 DB 연결 후 쿼리 실행
    print(f"{execution_date} 매출 데이터 추출 중...")
    return f"추출 완료: {execution_date}"

def transform_sales_data(**context):
    """데이터 변환 및 집계"""
    execution_date = context['execution_date'].strftime('%Y-%m-%d')
    
    # 데이터 변환 로직
    print(f"{execution_date} 데이터 변환 중...")
    # 카테고리별 집계, 환율 적용 등
    
    return "변환 완료"

# DAG 정의
sales_pipeline = DAG(
    'daily_sales_pipeline',
    default_args={
        'owner': 'data-team',
        'start_date': datetime(2024, 1, 1),
        'retries': 2,
        'retry_delay': timedelta(minutes=5)
    },
    schedule_interval='0 2 * * *',  # 매일 오전 2시
    catchup=False
)

# Task 정의
extract = PythonOperator(
    task_id='extract_sales',
    python_callable=extract_sales_data,
    dag=sales_pipeline
)

transform = PythonOperator(
    task_id='transform_sales',
    python_callable=transform_sales_data,
    dag=sales_pipeline
)

load = BashOperator(
    task_id='load_to_warehouse',
    bash_command='echo "데이터웨어하우스에 적재 중..." && sleep 3',
    dag=sales_pipeline
)

# 의존성 설정
extract >> transform >> load

 

2. MLOps 파이프라인

def train_model(**context):
    """모델 학습"""
    print("모델 학습 시작...")
    # 실제로는 scikit-learn, tensorflow 등 사용
    accuracy = 0.95  # 예시 정확도
    
    if accuracy < 0.9:
        raise ValueError("모델 정확도가 너무 낮습니다.")
    
    return f"모델 학습 완료. 정확도: {accuracy}"

def deploy_model(**context):
    """모델 배포"""
    print("모델 배포 시작...")
    # 실제로는 MLflow, Kubeflow 등 사용
    return "모델 배포 완료"

ml_pipeline = DAG(
    'weekly_model_training',
    default_args={
        'owner': 'ml-team',
        'start_date': datetime(2024, 1, 1)
    },
    schedule_interval='0 3 * * 0',  # 매주 일요일 오전 3시
    catchup=False
)

# ML 파이프라인 Tasks
data_validation = PythonOperator(
    task_id='validate_training_data',
    python_callable=lambda: print("데이터 검증 완료"),
    dag=ml_pipeline
)

feature_engineering = PythonOperator(
    task_id='feature_engineering',
    python_callable=lambda: print("피처 엔지니어링 완료"),
    dag=ml_pipeline
)

model_training = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=ml_pipeline
)

model_deployment = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=ml_pipeline
)

# 파이프라인 구성
data_validation >> feature_engineering >> model_training >> model_deployment

 

Airflow의 주요 장점

1. 가시성과 모니터링

Airflow 웹 UI를 통해 다음을 확인할 수 있습니다.

  • Graph View: DAG의 시각적 의존성 그래프
  • Tree View: 시간에 따른 Task 실행 상태
  • Gantt Chart: Task별 실행 시간 분석
  • Task Logs: 각 Task의 상세 실행 로그

2. 강력한 스케줄링

# 다양한 스케줄링 옵션
schedule_interval='@daily'        # 매일
schedule_interval='@hourly'       # 매시간
schedule_interval='0 */2 * * *'   # 2시간마다
schedule_interval='0 9 * * 1-5'   # 평일 오전 9시
schedule_interval=timedelta(hours=6)  # 6시간마다

 

 

3. 확장성

 

  • 다양한 Executor: Sequential, Local, Celery, Kubernetes
  • 200개 이상의 Operator: AWS, GCP, Azure, Snowflake, Slack 등
  • 커스텀 Operator: 특별한 요구사항에 맞는 자체 Operator 개발 가능

4. 내결함성

default_args = {
    'retries': 3,                              # 3번 재시도
    'retry_delay': timedelta(minutes=5),       # 5분 후 재시도
    'email_on_failure': True,                  # 실패 시 이메일
    'email_on_retry': False,                   # 재시도 시 이메일 안함
    'sla': timedelta(hours=2),                 # SLA 2시간
}

 

 

Airflow가 적합한 경우

Airflow 사용을 추천하는 경우

  • 복잡한 데이터 파이프라인: ETL/ELT 작업이 여러 단계로 구성
  • 의존성이 있는 작업들: A 작업 완료 후 B, C 작업 실행
  • 정기적인 배치 작업: 일/주/월 단위 반복 작업
  • 조건부 실행: 특정 조건에 따른 분기 처리 필요
  • 모니터링 필요: 작업 상태와 결과를 지속적으로 관찰

다른 도구를 고려해야하는 상황

 

  • 실시간 스트리밍: Apache Kafka, Apache Storm 등이 더 적합
  • 단순한 스케줄링: cron이나 시스템 스케줄러로 충분
  • 이벤트 기반 처리: AWS Lambda, Google Cloud Functions 등
  • 매우 짧은 주기: 초 단위 실행은 Airflow에 부적합

 

정리

Apache Airflow는 복잡한 워크플로우를 체계적으로 관리할 수 있게 해주는 강력한 도구입니다.

단순한 스케줄러를 넘어서 데이터 파이프라인, MLOps, 비즈니스 프로세스 자동화까지 다양한 영역에서 활용할 수 있습니다.

 

 

728x90
반응형
저작자표시 비영리 변경금지 (새창열림)

'Data Engineer' 카테고리의 다른 글

XAMPP MySQL이 “shutdown unexpectedly” — 포트 문제인 줄 알았는데, 진짜 원인은 mysql.global_priv 손상이 원인 (해결)  (0) 2025.10.16
CDC 파이프라인이란? 실시간 데이터 동기화의 핵심 기술  (2) 2025.07.30
Data Driven 이란?  (3) 2025.07.28
데이터 인프라 IaC 설계 철학: Monolithic vs Modular, 어떤 전략이 옳을까?  (2) 2025.07.21
메타데이터 관리 전략: SSOT vs Federated, 무엇이 정답일까?  (1) 2025.07.18
'Data Engineer' 카테고리의 다른 글
  • XAMPP MySQL이 “shutdown unexpectedly” — 포트 문제인 줄 알았는데, 진짜 원인은 mysql.global_priv 손상이 원인 (해결)
  • CDC 파이프라인이란? 실시간 데이터 동기화의 핵심 기술
  • Data Driven 이란?
  • 데이터 인프라 IaC 설계 철학: Monolithic vs Modular, 어떤 전략이 옳을까?
Balang
Balang
음악 전공생의 개발일지
  • Balang
    Balang
    Balang
  • 전체
    오늘
    어제
  • 반응형
    • All Post (160)
      • python (47)
        • selenium (4)
        • algorithm (10)
        • Django (6)
        • Pandas | Numpy (22)
      • SQL (9)
      • Data Engineer (36)
      • Data Scientist (3)
      • Data Analysis (11)
      • Computer Science (36)
      • Why? (16)
      • 마음가짐 (2)
  • 인기 글

  • 최근 댓글

  • 최근 글

  • 250x250
  • hELLO· Designed By정상우.v4.10.3
Balang
Apache Airflow란?
상단으로

티스토리툴바