Data Engineer

Airflow XCom 태스크 간 데이터 전달

Balang 2025. 3. 28. 16:29
728x90

Apache Airflow는 데이터 파이프라인을 구성하고 관리하는 데 널리 사용되는 워크플로우 오케스트레이션 툴입니다.

그중에서도 XCom(eXchange of information)은 DAG 내 태스크 간 데이터를 전달할 수 있게 해주는 핵심 기능입니다.

이 글에서는 Airflow의 XCom기능을 정리해보도록 하겠습니다.

1. XCom이란?

XCom은 "eXchange of information"의 줄임말로, 하나의 태스트에서 생성된 데이터를 다른 태스크에서 사용할 수 있게 해주는 메커니즘입니다.

 

일반적으로 Airflow의 태스트는 독립적으로 실행되지만, XCom을 활용하면 태스크 간 데이터 의존성이 필요한 경우 유용하게 사용할 수 있습니다.

 

예를 들어 전 단계 태스크에서 API 응답 데이터를 받아 다음 단계에서 처리할 때나, 파일 경로나 테이블 이름처럼 다음 태스크에서 참조해야할 정보를 전달할 때 사용할 수 있습니다.

 

XCom 데이터 흐름 다이어그램

┌────────────┐     Push      ┌────────────┐      Pull       ┌────────────┐
│ Task A     │ ───────────▶ │   XCom     │ ─────────────▶  │ Task B     │
│ (생성자)   │               │ (Airflow DB)│                 │ (소비자)   │
└────────────┘               └────────────┘                 └────────────┘

2. XCom 기본 사용법

2.1 xcom_push와 xcom_pull

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def push_function(**kwargs):
    kwargs['ti'].xcom_push(key='my_key', value='hello world')

def pull_function(**kwargs):
    value = kwargs['ti'].xcom_pull(key='my_key', task_ids='push_task')
    print(f'Pulled value: {value}')

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('xcom_example', schedule_interval='@once', default_args=default_args, catchup=False)

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    provide_context=True,
    dag=dag,
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag,
)

push_task >> pull_task

 

2.2 return을 이용한 XCom Push

PythonOperator의 경우 함수에서 반환한 값은 자동으로 XCom에 저장됩니다.

def push_return():
    return 'this is returned'

 

이 값은 task_instance.xcom_pull()key='return_value'를 지정해 가져올 수 있습니다.


3. XCom 사용 예시

S3 파일 경로 전달

S3에 파일을 업로드한 후, 해당 경로를 다음 태스크에서 처리해야할 경우

def upload_file(**kwargs):
    s3_path = 's3://my-bucket/data/file.csv'
    kwargs['ti'].xcom_push(key='s3_path', value=s3_path)

# 다음 태스크에서는 pull
s3_path = kwargs['ti'].xcom_pull(task_ids='upload_task', key='s3_path')

# 시각 예시
[upload_task] ---> XCom (s3_path) ---> [process_file_task]

4. Airflow UI에서 XCom 확인하기

1. Airflow 웹 UI 접속

2. 실행된 DAG > 특정 DAG Run 클릭

3. 상단 탭에서 "XCom" 클릭

4. 저장된 key-value 목록 확인가능


5. XCom 사용 시 주의사항

  • 데이터 크기 제한 : XCom 값은 기본적으로 Airflow의 메타데이터 DB에 저장됩니다.
    너무 큰 데이터를 저장하면 DB 성능에 영향을 줄 수 있습니다.
  • JSON 직렬화 가능 객체만 사용 : dict, list, string, number 등만 XCom으로 안전하게 전달 가능
  • 같은 key로 여러 번 push하는 경우 : 가장 마지막 push 값만 pull시 기본적으로 조회됩니다.

6.1 TaskFlow API와 XCom

TaskFlow API를 사용하면 XCom을 보다 명시적이고 직관적으로 사용할 수 있습니다.

from airflow.decorators import task, dag
from datetime import datetime

@dag(schedule_interval='@once', start_date=datetime(2024,1,1), catchup=False)
def my_dag():

    @task()
    def extract():
        return {'name': 'ChatGPT'}

    @task()
    def transform(data):
        data['name'] = data['name'].upper()
        return data

    @task()
    def load(data):
        print(f"Loaded data: {data}")

    load(transform(extract()))

my_dag_instance = my_dag()

6.2 Custom Operator에서 XCom 활용

Operator 내부에서 self.xcom_push() 사용

class MyCustomOperator(BaseOperator):
    def execute(self, context):
        result = some_processing()
        self.xcom_push(context, key='result', value=result)

7. 환경 설정 예시

Airflow가 XCom을 DB에 저장하기 때문에, DB 설정은 중요합니다. 기본 SQLite 환경에서는 성능 한계가 있으므로,

PostgreSQL 또는 MySQL 사용하는게 좋습니다.

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"

  airflow-webserver:
    image: apache/airflow:2.7.2
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    depends_on:
      - postgres

docker-compose 환경에서는 PostgreSQL 설정 예시 입니다.


참고자료

- Apache Airflow 공식 문서 - XCom

728x90
반응형