Apache Airflow는 데이터 파이프라인을 구성하고 관리하는 데 널리 사용되는 워크플로우 오케스트레이션 툴입니다.
그중에서도 XCom(eXchange of information)은 DAG 내 태스크 간 데이터를 전달할 수 있게 해주는 핵심 기능입니다.
이 글에서는 Airflow의 XCom기능을 정리해보도록 하겠습니다.
1. XCom이란?
XCom은 "eXchange of information"의 줄임말로, 하나의 태스트에서 생성된 데이터를 다른 태스크에서 사용할 수 있게 해주는 메커니즘입니다.
일반적으로 Airflow의 태스트는 독립적으로 실행되지만, XCom을 활용하면 태스크 간 데이터 의존성이 필요한 경우 유용하게 사용할 수 있습니다.
예를 들어 전 단계 태스크에서 API 응답 데이터를 받아 다음 단계에서 처리할 때나, 파일 경로나 테이블 이름처럼 다음 태스크에서 참조해야할 정보를 전달할 때 사용할 수 있습니다.
XCom 데이터 흐름 다이어그램
┌────────────┐ Push ┌────────────┐ Pull ┌────────────┐
│ Task A │ ───────────▶ │ XCom │ ─────────────▶ │ Task B │
│ (생성자) │ │ (Airflow DB)│ │ (소비자) │
└────────────┘ └────────────┘ └────────────┘
2. XCom 기본 사용법
2.1 xcom_push와 xcom_pull
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_function(**kwargs):
kwargs['ti'].xcom_push(key='my_key', value='hello world')
def pull_function(**kwargs):
value = kwargs['ti'].xcom_pull(key='my_key', task_ids='push_task')
print(f'Pulled value: {value}')
default_args = {'start_date': datetime(2024, 1, 1)}
dag = DAG('xcom_example', schedule_interval='@once', default_args=default_args, catchup=False)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=dag,
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag,
)
push_task >> pull_task
2.2 return을 이용한 XCom Push
PythonOperator의 경우 함수에서 반환한 값은 자동으로 XCom에 저장됩니다.
def push_return():
return 'this is returned'
이 값은 task_instance.xcom_pull()
로 key='return_value'
를 지정해 가져올 수 있습니다.
3. XCom 사용 예시
S3 파일 경로 전달
S3에 파일을 업로드한 후, 해당 경로를 다음 태스크에서 처리해야할 경우
def upload_file(**kwargs):
s3_path = 's3://my-bucket/data/file.csv'
kwargs['ti'].xcom_push(key='s3_path', value=s3_path)
# 다음 태스크에서는 pull
s3_path = kwargs['ti'].xcom_pull(task_ids='upload_task', key='s3_path')
# 시각 예시
[upload_task] ---> XCom (s3_path) ---> [process_file_task]
4. Airflow UI에서 XCom 확인하기
1. Airflow 웹 UI 접속
2. 실행된 DAG > 특정 DAG Run 클릭
3. 상단 탭에서 "XCom" 클릭
4. 저장된 key-value 목록 확인가능
5. XCom 사용 시 주의사항
- 데이터 크기 제한 : XCom 값은 기본적으로 Airflow의 메타데이터 DB에 저장됩니다.
너무 큰 데이터를 저장하면 DB 성능에 영향을 줄 수 있습니다. - JSON 직렬화 가능 객체만 사용 : dict, list, string, number 등만 XCom으로 안전하게 전달 가능
- 같은 key로 여러 번 push하는 경우 : 가장 마지막 push 값만 pull시 기본적으로 조회됩니다.
6.1 TaskFlow API와 XCom
TaskFlow API를 사용하면 XCom을 보다 명시적이고 직관적으로 사용할 수 있습니다.
from airflow.decorators import task, dag
from datetime import datetime
@dag(schedule_interval='@once', start_date=datetime(2024,1,1), catchup=False)
def my_dag():
@task()
def extract():
return {'name': 'ChatGPT'}
@task()
def transform(data):
data['name'] = data['name'].upper()
return data
@task()
def load(data):
print(f"Loaded data: {data}")
load(transform(extract()))
my_dag_instance = my_dag()
6.2 Custom Operator에서 XCom 활용
Operator 내부에서 self.xcom_push()
사용
class MyCustomOperator(BaseOperator):
def execute(self, context):
result = some_processing()
self.xcom_push(context, key='result', value=result)
7. 환경 설정 예시
Airflow가 XCom을 DB에 저장하기 때문에, DB 설정은 중요합니다. 기본 SQLite 환경에서는 성능 한계가 있으므로,
PostgreSQL 또는 MySQL 사용하는게 좋습니다.
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5432:5432"
airflow-webserver:
image: apache/airflow:2.7.2
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
depends_on:
- postgres
docker-compose 환경에서는 PostgreSQL 설정 예시 입니다.
참고자료
'Data Engineer' 카테고리의 다른 글
'정규화(Normalization)'를 과도하게 적용하면 Spark 성능이 저하될까? (0) | 2025.04.24 |
---|---|
PySpark vs. Scala Spark 어떤 언어를 선택해야할까? (0) | 2025.04.02 |
Airflow로 데이터 품질 테스트 (0) | 2025.03.28 |
Apache Spark vs Apace Flink (0) | 2025.03.26 |
Redshift vs BigQuery vs Snowflake (0) | 2025.03.25 |