정규화는 데이터베이스 설계의 기본 원칙입니다.
중복을 줄이고 무결성을 보장하는 데에 탁월한 방식이죠. 하지만 Spark처럼 분산 기반의 데이터 처리 엔진에서 정규화 구조를 그대로 사용하면 심각한 성능 문제가 발생할 수 있습니다.
문제는 Spark가 전통적인 RDB처럼 조인을 가볍게 처리하지 못한다는 점에 있습니다.
Spark는 분산 환경에서 데이터를 조인하기 위해 다음과 같은 구조적 과정들을 거칩니다:
- 데이터를 조인 키 기준으로 재분산(Shuffle) → 네트워크 비용 급증
- 각 Executor에서 정렬(Sort) 및 조인 수행 → CPU, 디스크 부하 증가
- Stage 간 I/O 증가, task skew 발생 가능성 증가
즉, 단순히 조인을 많이 했기 때문에 느린 것이 아니라, Spark의 내부 구조와 정규화가 맞지 않기 때문입니다.
이로 인해 실무에서는 다음과 같은 문제들이 반복적으로 발생합니다:
- 조인 단계에서 Stage 수가 급격히 증가하며, Shuffle read/write 양이 폭증
- 데이터 쏠림(skew) 으로 인해 일부 task가 병목을 유발 (특히 user_id, product_id 같이 집중되는 키)
- 조인 이후 메모리 초과 또는 spill 발생, 디스크에 임시 저장 → 전체 성능 저하
따라서, Spark에서 조인을 어떻게 최적화하고, 왜 정규화를 재설계해야 하는지를 아는 것은 실무 데이터 엔지니어에게 필수적인 역량입니다.
데이터 모델링을 배울 때 자주 강조되는 정규화(Normalization)는 데이터 중복을 줄이고 무결성을 보장하는 데 탁월한 설계 방식입니다.
하지만 분산 처리 시스템인 Apache Spark에서는 이 정규화가 오히려 성능 병목의 주요 원인이 되기도 합니다.
대용량 데이터를 처리하는 Spark는 조인의 수, 데이터 이동량, 파티셔닝 상태에 따라 성능이 크게 좌우되며, 정규화된 데이터 구조는 Spark에게 불리한 구조가 되는 경우가 많습니다.
Spark에서의 JOIN 처리 방식
기본적으로 Spark의 JOIN 방식은 다음 두 가지입니다:
JOIN 방식 | 설명 | 사용 조건 |
Broadcast Hash Join | 작은 테이블을 Executor에 전체 복제 후 메모리 조인 | spark.sql.autoBroadcastJoinThreshold 이하의 크기 |
Shuffle Sort Merge Join | 두 테이블을 조인 키 기준으로 셔플 및 정렬 후 병합 | Broadcast 불가시 기본 수행 방식 |
내부 동작 요약
- repartition: 조인 키 기준으로 데이터 분산
- shuffle: 네트워크를 통해 Executor 간 데이터 이동
- sort: 정렬 후 병합 조인 수행
이때 발생하는 디스크 I/O + 네트워크 I/O + 정렬 비용은 대용량 처리 시 성능 저하의 주범입니다.
과도한 정규화 구조의 실수
시나리오
한 이커머스 기업에서 사용자 이벤트 로그를 분석해야 하는 상황.
event_log 테이블에는 사용자 ID, 상품 ID 등 외래 키만 존재하며,
세부 정보는 각각의 디멘션 테이블(user_dim, product_dim 등)에 나뉘어 있습니다.
Spark 코드 예시 (JOIN 중심)
event_df = spark.read.parquet("s3://data/event_log")
user_df = spark.read.parquet("s3://data/user_dim")
product_df = spark.read.parquet("s3://data/product_dim")
joined_df = event_df \
.join(user_df, on="user_id", how="left") \
.join(product_df, on="product_id", how="left")
joined_df.write.parquet("s3://result/joined")
Spark UI에서 확인된 문제
- Stage 3: Shuffle Read = 47.2 GB
- Task Skew: 특정 Task만 12분, 나머지는 수 초
- 원인: 일부 user_id에 데이터 쏠림(skew), 다중 정규화 조인으로 인한 셔플 과도
성능 비교 실험: 정규화 vs Flatten
A. 정규화된 다중 조인
joined_df = event_df \
.join(user_df, "user_id") \
.join(product_df, "product_id") \
.select("event_id", "user_gender", "product_category")
joined_df.write.parquet("normalized_output")
B. 미리 조인된 Flatten 테이블
flat_df = spark.read.parquet("flattened_log")
flat_df.select("event_id", "user_gender", "product_category").write.parquet("flat_output")
구분 | 실행 시간 | Shuffle Read | Stage 수 |
정규화 | 24분 | 45GB 이상 | 8 |
Flatten | 3분 | 없음 | 2 |
예외 상황 및 트러블슈팅
문제 | 원인 | 해결 전략 |
Broadcast Join 자동 미적용 | 테이블 크기가 임계값 초과 | 수동 broadcast(df) 사용 |
Task Skew | 특정 키에 트래픽 집중 | Salting, Skew Join Hint 사용 |
조인 후 메모리 초과 | 조인 후 row 수 폭발 | Persist 레벨 조절, select로 필요한 컬럼만 유지 |
예외 처리 코드 예시
try:
result_df = df1.join(df2, "id")
except AnalysisException as e:
print("JOIN 실패:", e)
전략 정리
1. Broadcast 활용
Broadcast Join은 Spark에서 가장 효율적인 JOIN 방식 중 하나입니다. 이유는 다음과 같습니다:
- 일반적인 Shuffle Sort Merge Join은 조인 키 기준으로 전체 데이터를 재분배(shuffle)해야 하기 때문에 디스크 I/O + 네트워크 전송 + 정렬 비용이 큽니다.
- 반면, Broadcast Join은 작은 테이블(디멘션 테이블 등)을 Executor의 메모리에 통째로 복사하여, 로컬에서 Hash Join으로 처리합니다. 이 방식은 네트워크 통신 없이 조인이 가능하므로 속도가 비약적으로 향상됩니다.
- 실무에서 user_dim, product_dim처럼 수백만 건 이하의 테이블은 대부분 Broadcast가 유리합니다.
단, broadcast가 무조건 빠르지는 않으며, 메모리 초과 시 OOM이 발생할 수 있으므로 Broadcast할 테이블의 크기를 반드시 고려해야 합니다.
from pyspark.sql.functions import broadcast
joined = df1.join(broadcast(df2), "user_id")
2. 분석 전용 Flatten 테이블 생성
정규화된 테이블을 조인해서 분석하는 방식은 테이블 수가 늘어날수록 성능이 급격히 저하됩니다. 특히 분석은 반복적으로 수행되며 실시간성이 중요한 경우가 많기 때문에, 반복되는 조인을 사전에 수행해 미리 저장해두는 전략이 효과적입니다.
Flatten 테이블은 다음과 같은 이유로 유리합니다:
- 분석 쿼리는 대부분 특정 조건의 집계 또는 필터를 수행하는데, 조인으로 인한 병목 없이 바로 필요한 데이터를 조회할 수 있습니다.
- Spark는 쿼리를 실행할 때마다 DAG를 재작성하고 셔플을 수행하는데, Flatten 테이블은 이를 피할 수 있는 캐시된 형태입니다.
- 이벤트 기반의 대용량 로그 분석, 추천 알고리즘 피처 준비, 집계 리포트 생성 등에서 효과적입니다.
flattened_df = event_df \
.join(user_df, "user_id") \
.join(product_df, "product_id") \
.select("event_id", "user_gender", "product_name")
flattened_df.write.parquet("/flattened/events")
3. Repartition & Cache
조인 대상 테이블의 파티셔닝 상태가 조인 키 기준으로 맞춰져 있지 않으면, Spark는 강제로 셔플을 수행하게 됩니다. 이 과정에서 성능이 저하되기 때문에, 조인 전에 repartition을 통해 데이터 분산 구조를 최적화하는 것이 중요합니다.
또한 조인 대상이 여러 번 반복 사용되는 경우, cache()를 통해 RDD 또는 DataFrame을 메모리에 저장하면 반복적인 I/O 및 재계산 비용을 방지할 수 있습니다.
이 전략이 필요한 이유는 다음과 같습니다:
- 조인 키로 파티셔닝하면 셔플 발생을 최소화하여 전체 쿼리 처리 시간을 줄일 수 있습니다.
- cache를 활용하면 같은 연산이 재사용될 때 Spark가 DAG를 단축하고, Stage 재생성을 줄여 성능이 향상됩니다.
- 실시간 분석, 모델 학습 전 피처 병합 단계 등 반복 연산이 필요한 곳에서 유용합니다.
event_df = event_df.repartition("user_id").cache()
요약
- Spark는 JOIN이 많을수록 느려진다기보다, 셔플이 많을수록 느려진다가 핵심
- 정규화된 테이블 구조는 분산 처리 환경에서 병목 유발 가능성이 높음
- Flatten, Broadcast, Repartition 등 다양한 전략을 상황에 따라 병행 적용해야 함
참고 자료
'Data Engineer' 카테고리의 다른 글
PySpark에서 NULL이 많을 때 filter가 안 먹히는 이유 (0) | 2025.04.24 |
---|---|
PySpark vs. Scala Spark 어떤 언어를 선택해야할까? (0) | 2025.04.02 |
Airflow XCom 태스크 간 데이터 전달 (0) | 2025.03.28 |
Airflow로 데이터 품질 테스트 (0) | 2025.03.28 |
Apache Spark vs Apace Flink (0) | 2025.03.26 |