Airflow XCom 태스크 간 데이터 전달

2025. 3. 28. 16:29·Data Engineer
728x90

Apache Airflow는 데이터 파이프라인을 구성하고 관리하는 데 널리 사용되는 워크플로우 오케스트레이션 툴입니다.

그중에서도 XCom(eXchange of information)은 DAG 내 태스크 간 데이터를 전달할 수 있게 해주는 핵심 기능입니다.

이 글에서는 Airflow의 XCom기능을 정리해보도록 하겠습니다.

1. XCom이란?

XCom은 "eXchange of information"의 줄임말로, 하나의 태스트에서 생성된 데이터를 다른 태스크에서 사용할 수 있게 해주는 메커니즘입니다.

 

일반적으로 Airflow의 태스트는 독립적으로 실행되지만, XCom을 활용하면 태스크 간 데이터 의존성이 필요한 경우 유용하게 사용할 수 있습니다.

 

예를 들어 전 단계 태스크에서 API 응답 데이터를 받아 다음 단계에서 처리할 때나, 파일 경로나 테이블 이름처럼 다음 태스크에서 참조해야할 정보를 전달할 때 사용할 수 있습니다.

 

XCom 데이터 흐름 다이어그램

┌────────────┐     Push      ┌────────────┐      Pull       ┌────────────┐
│ Task A     │ ───────────▶ │   XCom     │ ─────────────▶  │ Task B     │
│ (생성자)   │               │ (Airflow DB)│                 │ (소비자)   │
└────────────┘               └────────────┘                 └────────────┘

2. XCom 기본 사용법

2.1 xcom_push와 xcom_pull

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def push_function(**kwargs):
    kwargs['ti'].xcom_push(key='my_key', value='hello world')

def pull_function(**kwargs):
    value = kwargs['ti'].xcom_pull(key='my_key', task_ids='push_task')
    print(f'Pulled value: {value}')

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('xcom_example', schedule_interval='@once', default_args=default_args, catchup=False)

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    provide_context=True,
    dag=dag,
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag,
)

push_task >> pull_task

 

2.2 return을 이용한 XCom Push

PythonOperator의 경우 함수에서 반환한 값은 자동으로 XCom에 저장됩니다.

def push_return():
    return 'this is returned'

 

이 값은 task_instance.xcom_pull() 로 key='return_value'를 지정해 가져올 수 있습니다.


3. XCom 사용 예시

S3 파일 경로 전달

S3에 파일을 업로드한 후, 해당 경로를 다음 태스크에서 처리해야할 경우

def upload_file(**kwargs):
    s3_path = 's3://my-bucket/data/file.csv'
    kwargs['ti'].xcom_push(key='s3_path', value=s3_path)

# 다음 태스크에서는 pull
s3_path = kwargs['ti'].xcom_pull(task_ids='upload_task', key='s3_path')

# 시각 예시
[upload_task] ---> XCom (s3_path) ---> [process_file_task]

4. Airflow UI에서 XCom 확인하기

1. Airflow 웹 UI 접속

2. 실행된 DAG > 특정 DAG Run 클릭

3. 상단 탭에서 "XCom" 클릭

4. 저장된 key-value 목록 확인가능


5. XCom 사용 시 주의사항

  • 데이터 크기 제한 : XCom 값은 기본적으로 Airflow의 메타데이터 DB에 저장됩니다.
    너무 큰 데이터를 저장하면 DB 성능에 영향을 줄 수 있습니다.
  • JSON 직렬화 가능 객체만 사용 : dict, list, string, number 등만 XCom으로 안전하게 전달 가능
  • 같은 key로 여러 번 push하는 경우 : 가장 마지막 push 값만 pull시 기본적으로 조회됩니다.

6.1 TaskFlow API와 XCom

TaskFlow API를 사용하면 XCom을 보다 명시적이고 직관적으로 사용할 수 있습니다.

from airflow.decorators import task, dag
from datetime import datetime

@dag(schedule_interval='@once', start_date=datetime(2024,1,1), catchup=False)
def my_dag():

    @task()
    def extract():
        return {'name': 'ChatGPT'}

    @task()
    def transform(data):
        data['name'] = data['name'].upper()
        return data

    @task()
    def load(data):
        print(f"Loaded data: {data}")

    load(transform(extract()))

my_dag_instance = my_dag()

6.2 Custom Operator에서 XCom 활용

Operator 내부에서 self.xcom_push() 사용

class MyCustomOperator(BaseOperator):
    def execute(self, context):
        result = some_processing()
        self.xcom_push(context, key='result', value=result)

7. 환경 설정 예시

Airflow가 XCom을 DB에 저장하기 때문에, DB 설정은 중요합니다. 기본 SQLite 환경에서는 성능 한계가 있으므로,

PostgreSQL 또는 MySQL 사용하는게 좋습니다.

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"

  airflow-webserver:
    image: apache/airflow:2.7.2
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    depends_on:
      - postgres

docker-compose 환경에서는 PostgreSQL 설정 예시 입니다.


참고자료

- Apache Airflow 공식 문서 - XCom

728x90
반응형
저작자표시 비영리 변경금지 (새창열림)

'Data Engineer' 카테고리의 다른 글

'정규화(Normalization)'를 과도하게 적용하면 Spark 성능이 저하될까?  (0) 2025.04.24
PySpark vs. Scala Spark 어떤 언어를 선택해야할까?  (0) 2025.04.02
Airflow로 데이터 품질 테스트  (0) 2025.03.28
Apache Spark vs Apace Flink  (0) 2025.03.26
Redshift vs BigQuery vs Snowflake  (0) 2025.03.25
'Data Engineer' 카테고리의 다른 글
  • '정규화(Normalization)'를 과도하게 적용하면 Spark 성능이 저하될까?
  • PySpark vs. Scala Spark 어떤 언어를 선택해야할까?
  • Airflow로 데이터 품질 테스트
  • Apache Spark vs Apace Flink
Balang
Balang
음악 전공생의 개발일지
  • Balang
    Balang
    Balang
  • 전체
    오늘
    어제
  • 반응형
    • All Post (146)
      • python (45)
        • selenium (4)
        • algorithm (9)
        • Django (6)
        • Pandas | Numpy (22)
      • SQL (9)
      • Data Engineer (29)
      • Data Scientist (3)
      • Data Analysis (9)
      • Computer Science (35)
      • Why? (15)
      • 마음가짐 (1)
  • 인기 글

  • 최근 댓글

  • 최근 글

  • 250x250
  • hELLO· Designed By정상우.v4.10.3
Balang
Airflow XCom 태스크 간 데이터 전달
상단으로

티스토리툴바