🌿 jadelog
⚙️ Data Engineering

PySpark: 대용량 분산 처리 DataFrame 기초

status
Public
date
Mar 30, 2026
slug
pyspark-performance-optimization-guide
summary
PySpark는 Apache Spark를 Python 환경에서 사용할 수 있게 해주는 API로, 대량의 데이터를 분산 처리할 수 있다. 핵심 구조로는 Driver Node, Worker Node, Cluster Manager가 있으며, RDD와 DataFrame이 주요 데이터 구조이다. 학습 로드맵은 DataFrame 기초 조작, 스파크 최적화 및 고급 기능, 확장 모듈 다루기로 구성된다. Lazy Evaluation, SparkSession 생성, 데이터 불러오기 및 변환, 집계, 조인 등의 기법을 통해 성능을 최적화할 수 있다. 또한, Spark SQL, Structured Streaming, MLlib 등의 확장 모듈을 활용하여 데이터 엔지니어링을 강화할 수 있다.
type
Post
category
⚙️ Data Engineering
tags
PySpark
Data Engineering
thumbnail
pyspark-performance-optimization-guide.png
series

0) PySpark의 핵심: 왜 사용하는가?

PySpark는 분산 컴퓨팅 프레임워크인 Apache Spark를 Python 환경에서 다룰 수 있게 해주는 API입니다. Pandas가 한 대의 컴퓨터(메모리)에서 다룰 수 있는 데이터만 처리할 수 있다면, PySpark는 수십, 수백 대의 컴퓨터 리소스를 하나로 묶어 TB급 이상의 데이터를 순식간에 병렬로 처리할 수 있게 해줍니다.

3가지 핵심 구조

PySpark를 제대로 다루려면 내부에서 데이터와 작업이 물리적으로 어떻게 쪼개져 처리되는지 이해해야 합니다.
  • Driver Node (운전석): 사용자가 작성한 파이썬 코드를 해석하고, 작업 계획을 세운 뒤 여러 대의 컴퓨터(Worker)에게 명령을 내리는 두뇌 역할을 합니다.
  • Worker Node / Executor (작업자): Driver의 명령을 받아 실제 데이터를 나누어 연산하고 결과를 반환하는 역할을 수행합니다.
  • Cluster Manager (작업 반장): Driver와 Worker 사이에서 자원(메모리, CPU)을 할당하고 관리합니다 (예: YARN, Kubernetes 등).

핵심 데이터 구조

  • RDD (Resilient Distributed Dataset): 스파크의 가장 근간이 되는 로우 레벨 데이터 구조입니다. 데이터가 여러 노드에 분산되어 있으며, 연산 도중 에러가 나거나 노드가 다운되어도 스스로 복구할 수 있는 Fault-tolerance을 가집니다. (현재는 아주 세밀한 제어가 필요할 때만 드물게 사용합니다.)
  • DataFrame: RDD 위에 구축된 고수준 API로, 관계형 데이터베이스의 테이블이나 판다스의 데이터프레임과 매우 유사하게 생겼습니다. 스파크의 내부 최적화 엔진(Catalyst Optimizer)이 알아서 연산을 가장 빠른 방식으로 최적화해주기 때문에, 실무 데이터 엔지니어링 및 분석의 90% 이상은 이 DataFrame API를 사용합니다.

1) DataFrame 기초 조작법 익히기

Lazy Evaluation (지연 실행)

PySpark의 가장 큰 특징은 “말만 하고 실행은 나중에 한다”는 것입니다.
  • Transformation (변환): select, filter 같은 작업은 "이렇게 할 거야"라는 계획만 세웁니다.
  • Action (실행): show, count, collect 같은 명령이 떨어지는 순간, 쌓아둔 계획을 한꺼번에 최적화해서 실행합니다.

SparkSession

from pyspark.sql import SparkSession # 스파크 세션 생성 (보통 spark라는 이름으로 사용함) spark = SparkSession.builder \ .appName("MasteringPySpark") \ .getOrCreate()

데이터 불러오기 (Read)

데이터를 가져올 때는 Schema를 직접 정의하면 스파크가 데이터를 일일이 읽어서 타입을 추측하는 시간을 아낄 수 있습니다.
# CSV 읽기 예시 df = spark.read.csv("data.csv", header=True, inferSchema=True) # JSON이나 Parquet(스파크 최적화 포맷)도 가능 # df = spark.read.json("data.json") # df = spark.read.parquet("data.parquet")

 

데이터 변환 (Transformation)

① 선택(Select)과 필터링(Filter)
from pyspark.sql.functions import col # 필요한 컬럼만 뽑기 df_selected = df.select("name", "age", "city") # 조건에 맞는 데이터만 거르기 (SQL의 WHERE) df_filtered = df.filter((col("age") >= 20) & (col("city") == "Seoul"))
② 컬럼 추가 및 변경 (withColumn)
# 나이에 1을 더한 'new_age' 컬럼 추가 df_new = df.withColumn("new_age", col("age") + 1) # 컬럼 이름 바꾸기 df_renamed = df.withColumnRenamed("city", "location")
 

데이터 집계 (GroupBy & Agg)

from pyspark.sql.functions import avg, sum, count # 도시별 평균 나이와 인원수 구하기 df_stats = df.groupBy("city").agg( avg("age").alias("avg_age"), count("name").alias("user_count")
 

데이터 결합 (Join)

# df_users와 df_orders를 user_id 기준으로 결합 df_joined = df_users.join(df_orders, on="user_id", how="inner")
 

결과 확인 (Action)

df_joined.show(5) # 상위 5개 데이터 출력 print(df_joined.count()) # 전체 행 개수 확인 df_joined.collect() # 데이터를 드라이버 메모리로 가져오기 (주의: 너무 크면 터짐)
 

실전 꿀팁

  1. Pandas vs PySpark: PySpark의 DataFrame은 Pandas와 달리 인덱스(Index) 개념이 없습니다. 특정 행을 df[0]처럼 가져올 수 없으니 반드시 filter나 정렬을 사용하세요.
  1. Case Sensitivity: 기본적으로 PySpark는 대소문자를 구분하지 않지만, 설정에 따라 달라질 수 있으니 컬럼명은 명확하게 관리하는 게 좋습니다.
  1. Shuffle (셔플) 조심: groupByjoin은 데이터를 네트워크를 통해 주고받는 '셔플'을 발생시킵니다. 데이터가 클 때 가장 느려지는 지점이니, 꼭 필요한 데이터만 남기고 먼저 filter를 한 뒤에 Join 하세요.

2) Spark 최적화와 고급 기능

데이터 용량이 커지고 비즈니스 로직이 복잡해질수록, 분산 환경의 이점을 100% 뽑아내기 위한 고급 스킬과 최적화 기법이 필수적입니다.

윈도우 함수 (Window Function)

그룹화(groupBy)를 하면 결과가 그룹당 한 줄로 요약되어 줄어들지만, 윈도우 함수를 사용하면 기존 행(Row)의 형태를 그대로 유지하면서 그룹 내의 순위나 누적합 등을 계산할 수 있습니다.
from pyspark.sql.window import Window from pyspark.sql.functions import rank, col # 부서(department)별로 월급(salary)이 높은 순서대로 순위 매기기 windowSpec = Window.partitionBy("department").orderBy(col("salary").desc()) # 기존 데이터프레임에 'rank' 컬럼 추가 df_ranked = df.withColumn("rank", rank().over(windowSpec))

사용자 정의 함수 (UDF: User Defined Function)

스파크에 내장된 기본 함수들만으로는 해결하기 힘든 아주 복잡한 문자열 전처리나 커스텀 수학 계산이 필요할 때, 파이썬 함수를 직접 만들어 DataFrame에 적용할 수 있습니다.
from pyspark.sql.functions import udf from pyspark.sql.types import StringType # 파이썬 일반 함수 정의 def grade_converter(score): if score >= 90: return "A" elif score >= 80: return "B" else: return "C" # 1. UDF로 변환 (반환될 데이터의 타입을 반드시 명시해야 함) grade_udf = udf(grade_converter, StringType()) # 2. DataFrame에 적용 df_graded = df.withColumn("grade", grade_udf(col("score")))

성능 최적화 1: 캐싱 (Caching)

지연 실행(Lazy Evaluation) 구조 때문에, 스파크는 동일한 DataFrame을 여러 번 호출하면 매번 처음부터 다시 연산합니다. 뒤에서 계속 재사용될 중간 데이터는 한 번 계산되었을 때 메모리에 올려두어(Cache) 처리 시간을 극적으로 단축할 수 있습니다.
# df_filtered 연산 결과를 메모리에 저장하겠다고 선언 df_filtered.cache() print(df_filtered.count()) # 첫 번째 Action 발생! 이때 연산이 수행되고 메모리에 저장됨 df_filtered.show() # 두 번째 Action부터는 연산 없이 메모리에서 바로 가져옴 (초고속) # 사용이 끝났다면 메모리에서 내리기 df_filtered.unpersist()

성능 최적화 2: 파티셔닝 (Partitioning) 조절

데이터가 너무 적게 쪼개져 있으면 놀고 있는 Worker 노드가 생기고, 너무 많이 쪼개져 있으면 파티션을 관리하는 비용이 더 듭니다. 상황에 맞게 데이터의 조각(파티션) 개수를 조절해야 합니다.
# 파티션 개수 강제 늘리기/균등 분배 (Shuffle 발생 🚨: 네트워크 비용 큼) df_repartitioned = df.repartition(100) # 파티션 개수 줄이기 (Shuffle 미발생: 주로 최종 결과물을 하나의 파일로 저장하기 직전에 사용) df_coalesced = df.coalesce(10)

성능 최적화 3: 셔플링 방지와 브로드캐스트 조인 (Broadcast Join)

수백 GB의 거대한 테이블과 수십 MB의 아주 작은 테이블(예: 국가 코드 매핑 테이블, 사용자 등급 테이블 등)을 Join 할 때, 셔플링(네트워크 데이터 이동)을 막기 위해 작은 테이블을 모든 Worker 노드에 통째로 복사해서 뿌려주는 강력한 최적화 기법입니다.
from pyspark.sql.functions import broadcast # df_large(거대 데이터)와 df_small(작은 데이터) 결합 # df_small을 각 노드로 복사해 뿌려서 셔플링을 아예 없애버림 df_joined = df_large.join(broadcast(df_small), on="country_code", how="left")

실전 최적화 꿀팁

  1. 파이썬 UDF의 함정: 파이썬으로 짠 UDF는 실행 시 스파크(JVM) 환경과 파이썬 환경 사이를 오가며 직렬화/역직렬화 과정을 거치므로 속도가 매우 느립니다. 가급적이면 스파크 내장 함수(pyspark.sql.functions)들을 조합해서 사용하는 것이 수십 배 더 빠릅니다.
  1. Explain (실행 계획 확인): df.explain()을 실행하면 스파크 엔진(Catalyst)이 이 코드를 내부적으로 어떻게 최적화해서 실행할지 물리적 계획(Physical Plan)을 출력합니다. 고수들은 이를 보고 불필요한 셔플이 발생하고 있지는 않은지 점검합니다.
  1. OOM(Out of Memory) 방지: collect() Action은 수십 대의 컴퓨터에 흩어진 거대한 결과를 내 컴퓨터 한 대(Driver)로 전부 끌어오기 때문에 메모리가 터지기(OOM) 쉽습니다. 결과 확인은 show(5)take(5)를 사용하고, 전체 저장이 필요하면 write.parquet() 등을 통해 다시 분산 저장하세요.
 
 

3) 확장 모듈 다뤄보기 (Spark SQL, Streaming, MLlib)

PySpark는 DataFrame API 외에도 특정 목적에 특화된 강력한 라이브러리들을 포함하고 있습니다. 이를 통해 데이터 엔지니어링의 범위를 무한히 넓힐 수 있습니다.

Spark SQL: 파이썬 코드 대신 SQL로 데이터 다루기

이미 SQL에 익숙하다면, 복잡한 파이썬 체이닝 코드 대신 친숙한 SQL 문법을 그대로 사용할 수 있습니다. 내부적으로는 DataFrame API와 동일한 최적화 엔진을 사용하므로 성능 차이가 없습니다.
# 1. DataFrame을 임시 뷰(Temporary View)로 등록하기 df.createOrReplaceTempView("people") # 2. SQL 쿼리 실행 (결과는 다시 DataFrame으로 반환됨) sql_results = spark.sql(""" SELECT city, AVG(age) as avg_age FROM people WHERE age >= 20 GROUP BY city """) sql_results.show()

Structured Streaming: 끊이지 않는 데이터 처리 (실시간)

스트리밍 데이터(로그, 센서 데이터 등)를 마치 정적인 테이블처럼 다룰 수 있게 해줍니다. 데이터가 들어올 때마다 테이블 아래에 계속 행이 추가된다는 'Unbounded Table' 개념이 핵심입니다.
# 실시간 데이터 소스 연결 (예: Kafka, 특정 디렉토리의 파일 등) lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load() # 실시간 단어 개수 세기 (기존 DataFrame 문법과 동일!) word_counts = lines.selectExpr("explode(split(value, ' ')) as word").groupBy("word").count() # 실시간 결과를 콘솔에 출력하기 query = word_counts.writeStream.outputMode("complete").format("console").start() query.awaitTermination()

MLlib: 대규모 분산 머신러닝

수백만 건의 데이터를 일반적인 사이킷런(Scikit-learn)으로 학습시키면 메모리 부족으로 터지기 마련입니다. MLlib은 이를 여러 노드에 분산시켜 학습시킵니다. 핵심은 모든 특성(Feature)을 하나의 'Vector' 컬럼으로 합쳐야 한다는 점입니다.
from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import LogisticRegression # 1. 특성(Feature)들을 하나의 벡터 컬럼 'features'로 합치기 assembler = VectorAssembler(inputCols=["age", "income", "education"], outputCol="features") output = assembler.transform(df) # 2. 모델 생성 및 학습 lr = LogisticRegression(featuresCol="features", labelCol="is_customer") model = lr.fit(output) # 3. 예측 수행 predictions = model.transform(output)
 

4) PySpark 완벽 마스터를 위한 마지막 체크리스트

  1. Lazy Evaluation의 장점은 무엇인가?
    1. (최적화 기법인 Catalyst Optimizer가 실행 전에 전체 계획을 검토할 수 있게 함)
      : 실시간으로 데이터를 처리하는 것이 아니라 작업을 먼저 선언해두고, 최적화(Predicate Pushdown, 내부적으로 실행 순서를 변경) 후에 작업을 실행할 수 있어 성능적으로 유리함
  1. collect()를 함부로 쓰면 안 되는가?
    1. (분산된 모든 데이터를 드라이버 노드의 메모리로 가져오기 때문에 OOM 에러 위험)
      : 분산된 데이터를 드라이버 노드(메인 노드)로 가져오기 때문에 OOM이 날 수 있음. 이 때는 show 나 write 을 통해 확인하는 것이 안전
  1. 조인 성능이 떨어질 때 가장 먼저 고려할 최적화는?
    1. (작은 테이블이라면 broadcast 조인 사용하기)
      : 작은 테이블을 미리 모든 워커 노드의 메모리에 broadcast 하여 shuffle 피하기 (스파크에서 조인 시, 같은 키를 가진 데이터를 같은 워커 노드로 모으기 위해 네트워크를 타고 데이터가 대규모로 이동)
  1. UDF 사용을 지양해야 하는 이유는?
    1. (파이썬과 JVM 간의 오버헤드 발생. 가급적 내장 함수 사용 권장)
      : 직렬화/역직렬화 발생. 최대한 내장 함수를 이용해서 구현하는 것을 권장
 
Related Posts
⚙️ Data Engineering
Series: 26년 학습

데이터를 움직이는 힘: 데이터 거버넌스 & 엔지니어링 실전 강의 정리

Dec 6, 2025

모두의 연구소 [데이터를 움직이는 힘: 데이터 거버넌스 & 엔지니어링 실전] 강의 수강

Data Engineering
데이터를 움직이는 힘
⚙️ Data Engineering
Series: Tech Blog InQuery

AWS DataZone에서 OpenLineage 기반의 Airflow 데이터 계보 그리기

May 2, 2025

AWS DataZone과 OpenLineage를 연동하여 Airflow 기반의 데이터 계보(Lineage)를 시각화하는 아키텍처와 구축 방법을 다룹니다. 이를 통해 복잡한 데이터 파이프라인의 흐름을 투명하게 관리하고 추적성을 확보하는 기술적 노하우를 확인해 보세요.

Airflow
Data Lineage Tracking
⚙️ Data Engineering

데이터 엔지니어링 2025년 전망: 실무자의 시선으로 읽기

Dec 23, 2024

AI 컴퓨팅과 SLM의 부상, Data IDE의 필요성 등 2025년 데이터 엔지니어링 트렌드를 실무 경험에 비추어 분석했습니다. 데이터 품질과 엔지니어의 역할 변화에 대한 고찰을 담았습니다.

Data Engineering
📚 Study
Series: Designing Data-Intensive Applications

Designing Data-Intensive Applications - (2) Defining NonFunctional Requirements

Mar 7, 2026

이 챕터는 데이터 중심 애플리케이션의 핵심인 세 가지 비기능적 요구사항(신뢰성, 확장성, 유지보수성)을 정의합니다. 트위터의 타임라인 구축 사례를 통해 읽기/쓰기 시점의 부하 분산 전략(Fan-out)과 트레이드오프를 살펴보고, p99.9와 같은 꼬리 지연 시간(Tail Latency) 관리의 중요성을 강조합니다. 최종적으로는 복잡성을 제어하는 추상화와 변화에 유연한 설계가 장기적인 시스템 운영에 어떤 영향을 미치는지 다룹니다. This chapter defines the three pillars of data-intensive applications: Reliability, Scalability, and Maintainability. Through the case study of X (Twitter) home timelines, it explores the trade-offs of fan-out strategies between write and read paths. It also emphasizes the importance of managing tail latencies (p99.9) and explains how abstraction and evolvability are crucial for long-term system health and managing accidental complexity.

Designing Data