PySpark: 대용량 분산 처리 DataFrame 기초
0) PySpark의 핵심: 왜 사용하는가?
3가지 핵심 구조
- Driver Node (운전석): 사용자가 작성한 파이썬 코드를 해석하고, 작업 계획을 세운 뒤 여러 대의 컴퓨터(Worker)에게 명령을 내리는 두뇌 역할을 합니다.
- Worker Node / Executor (작업자): Driver의 명령을 받아 실제 데이터를 나누어 연산하고 결과를 반환하는 역할을 수행합니다.
- Cluster Manager (작업 반장): Driver와 Worker 사이에서 자원(메모리, CPU)을 할당하고 관리합니다 (예: YARN, Kubernetes 등).
핵심 데이터 구조
- RDD (Resilient Distributed Dataset): 스파크의 가장 근간이 되는 로우 레벨 데이터 구조입니다. 데이터가 여러 노드에 분산되어 있으며, 연산 도중 에러가 나거나 노드가 다운되어도 스스로 복구할 수 있는 Fault-tolerance을 가집니다. (현재는 아주 세밀한 제어가 필요할 때만 드물게 사용합니다.)
- DataFrame: RDD 위에 구축된 고수준 API로, 관계형 데이터베이스의 테이블이나 판다스의 데이터프레임과 매우 유사하게 생겼습니다. 스파크의 내부 최적화 엔진(Catalyst Optimizer)이 알아서 연산을 가장 빠른 방식으로 최적화해주기 때문에, 실무 데이터 엔지니어링 및 분석의 90% 이상은 이 DataFrame API를 사용합니다.
1) DataFrame 기초 조작법 익히기
Lazy Evaluation (지연 실행)
- Transformation (변환):
select,filter같은 작업은 "이렇게 할 거야"라는 계획만 세웁니다.
- Action (실행):
show,count,collect같은 명령이 떨어지는 순간, 쌓아둔 계획을 한꺼번에 최적화해서 실행합니다.
SparkSession
from pyspark.sql import SparkSession # 스파크 세션 생성 (보통 spark라는 이름으로 사용함) spark = SparkSession.builder \ .appName("MasteringPySpark") \ .getOrCreate()
데이터 불러오기 (Read)
# CSV 읽기 예시 df = spark.read.csv("data.csv", header=True, inferSchema=True) # JSON이나 Parquet(스파크 최적화 포맷)도 가능 # df = spark.read.json("data.json") # df = spark.read.parquet("data.parquet")
데이터 변환 (Transformation)
from pyspark.sql.functions import col # 필요한 컬럼만 뽑기 df_selected = df.select("name", "age", "city") # 조건에 맞는 데이터만 거르기 (SQL의 WHERE) df_filtered = df.filter((col("age") >= 20) & (col("city") == "Seoul"))
# 나이에 1을 더한 'new_age' 컬럼 추가 df_new = df.withColumn("new_age", col("age") + 1) # 컬럼 이름 바꾸기 df_renamed = df.withColumnRenamed("city", "location")
데이터 집계 (GroupBy & Agg)
from pyspark.sql.functions import avg, sum, count # 도시별 평균 나이와 인원수 구하기 df_stats = df.groupBy("city").agg( avg("age").alias("avg_age"), count("name").alias("user_count")
데이터 결합 (Join)
# df_users와 df_orders를 user_id 기준으로 결합 df_joined = df_users.join(df_orders, on="user_id", how="inner")
결과 확인 (Action)
df_joined.show(5) # 상위 5개 데이터 출력 print(df_joined.count()) # 전체 행 개수 확인 df_joined.collect() # 데이터를 드라이버 메모리로 가져오기 (주의: 너무 크면 터짐)
실전 꿀팁
- Pandas vs PySpark: PySpark의 DataFrame은 Pandas와 달리 인덱스(Index) 개념이 없습니다. 특정 행을
df[0]처럼 가져올 수 없으니 반드시filter나 정렬을 사용하세요.
- Case Sensitivity: 기본적으로 PySpark는 대소문자를 구분하지 않지만, 설정에 따라 달라질 수 있으니 컬럼명은 명확하게 관리하는 게 좋습니다.
- Shuffle (셔플) 조심:
groupBy나join은 데이터를 네트워크를 통해 주고받는 '셔플'을 발생시킵니다. 데이터가 클 때 가장 느려지는 지점이니, 꼭 필요한 데이터만 남기고 먼저filter를 한 뒤에 Join 하세요.
2) Spark 최적화와 고급 기능
윈도우 함수 (Window Function)
groupBy)를 하면 결과가 그룹당 한 줄로 요약되어 줄어들지만, 윈도우 함수를 사용하면 기존 행(Row)의 형태를 그대로 유지하면서 그룹 내의 순위나 누적합 등을 계산할 수 있습니다.from pyspark.sql.window import Window from pyspark.sql.functions import rank, col # 부서(department)별로 월급(salary)이 높은 순서대로 순위 매기기 windowSpec = Window.partitionBy("department").orderBy(col("salary").desc()) # 기존 데이터프레임에 'rank' 컬럼 추가 df_ranked = df.withColumn("rank", rank().over(windowSpec))
사용자 정의 함수 (UDF: User Defined Function)
from pyspark.sql.functions import udf from pyspark.sql.types import StringType # 파이썬 일반 함수 정의 def grade_converter(score): if score >= 90: return "A" elif score >= 80: return "B" else: return "C" # 1. UDF로 변환 (반환될 데이터의 타입을 반드시 명시해야 함) grade_udf = udf(grade_converter, StringType()) # 2. DataFrame에 적용 df_graded = df.withColumn("grade", grade_udf(col("score")))
성능 최적화 1: 캐싱 (Caching)
# df_filtered 연산 결과를 메모리에 저장하겠다고 선언 df_filtered.cache() print(df_filtered.count()) # 첫 번째 Action 발생! 이때 연산이 수행되고 메모리에 저장됨 df_filtered.show() # 두 번째 Action부터는 연산 없이 메모리에서 바로 가져옴 (초고속) # 사용이 끝났다면 메모리에서 내리기 df_filtered.unpersist()
성능 최적화 2: 파티셔닝 (Partitioning) 조절
# 파티션 개수 강제 늘리기/균등 분배 (Shuffle 발생 🚨: 네트워크 비용 큼) df_repartitioned = df.repartition(100) # 파티션 개수 줄이기 (Shuffle 미발생: 주로 최종 결과물을 하나의 파일로 저장하기 직전에 사용) df_coalesced = df.coalesce(10)
성능 최적화 3: 셔플링 방지와 브로드캐스트 조인 (Broadcast Join)
from pyspark.sql.functions import broadcast # df_large(거대 데이터)와 df_small(작은 데이터) 결합 # df_small을 각 노드로 복사해 뿌려서 셔플링을 아예 없애버림 df_joined = df_large.join(broadcast(df_small), on="country_code", how="left")
실전 최적화 꿀팁
- 파이썬 UDF의 함정: 파이썬으로 짠 UDF는 실행 시 스파크(JVM) 환경과 파이썬 환경 사이를 오가며 직렬화/역직렬화 과정을 거치므로 속도가 매우 느립니다. 가급적이면 스파크 내장 함수(
pyspark.sql.functions)들을 조합해서 사용하는 것이 수십 배 더 빠릅니다.
- Explain (실행 계획 확인):
df.explain()을 실행하면 스파크 엔진(Catalyst)이 이 코드를 내부적으로 어떻게 최적화해서 실행할지 물리적 계획(Physical Plan)을 출력합니다. 고수들은 이를 보고 불필요한 셔플이 발생하고 있지는 않은지 점검합니다.
- OOM(Out of Memory) 방지:
collect()Action은 수십 대의 컴퓨터에 흩어진 거대한 결과를 내 컴퓨터 한 대(Driver)로 전부 끌어오기 때문에 메모리가 터지기(OOM) 쉽습니다. 결과 확인은show(5)나take(5)를 사용하고, 전체 저장이 필요하면write.parquet()등을 통해 다시 분산 저장하세요.
3) 확장 모듈 다뤄보기 (Spark SQL, Streaming, MLlib)
Spark SQL: 파이썬 코드 대신 SQL로 데이터 다루기
# 1. DataFrame을 임시 뷰(Temporary View)로 등록하기 df.createOrReplaceTempView("people") # 2. SQL 쿼리 실행 (결과는 다시 DataFrame으로 반환됨) sql_results = spark.sql(""" SELECT city, AVG(age) as avg_age FROM people WHERE age >= 20 GROUP BY city """) sql_results.show()
Structured Streaming: 끊이지 않는 데이터 처리 (실시간)
# 실시간 데이터 소스 연결 (예: Kafka, 특정 디렉토리의 파일 등) lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load() # 실시간 단어 개수 세기 (기존 DataFrame 문법과 동일!) word_counts = lines.selectExpr("explode(split(value, ' ')) as word").groupBy("word").count() # 실시간 결과를 콘솔에 출력하기 query = word_counts.writeStream.outputMode("complete").format("console").start() query.awaitTermination()
MLlib: 대규모 분산 머신러닝
from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import LogisticRegression # 1. 특성(Feature)들을 하나의 벡터 컬럼 'features'로 합치기 assembler = VectorAssembler(inputCols=["age", "income", "education"], outputCol="features") output = assembler.transform(df) # 2. 모델 생성 및 학습 lr = LogisticRegression(featuresCol="features", labelCol="is_customer") model = lr.fit(output) # 3. 예측 수행 predictions = model.transform(output)
4) PySpark 완벽 마스터를 위한 마지막 체크리스트
- Lazy Evaluation의 장점은 무엇인가?
- 왜
collect()를 함부로 쓰면 안 되는가?
- 조인 성능이 떨어질 때 가장 먼저 고려할 최적화는?
broadcast 조인 사용하기)- UDF 사용을 지양해야 하는 이유는?
데이터를 움직이는 힘: 데이터 거버넌스 & 엔지니어링 실전 강의 정리
모두의 연구소 [데이터를 움직이는 힘: 데이터 거버넌스 & 엔지니어링 실전] 강의 수강
AWS DataZone에서 OpenLineage 기반의 Airflow 데이터 계보 그리기
AWS DataZone과 OpenLineage를 연동하여 Airflow 기반의 데이터 계보(Lineage)를 시각화하는 아키텍처와 구축 방법을 다룹니다. 이를 통해 복잡한 데이터 파이프라인의 흐름을 투명하게 관리하고 추적성을 확보하는 기술적 노하우를 확인해 보세요.
데이터 엔지니어링 2025년 전망: 실무자의 시선으로 읽기
AI 컴퓨팅과 SLM의 부상, Data IDE의 필요성 등 2025년 데이터 엔지니어링 트렌드를 실무 경험에 비추어 분석했습니다. 데이터 품질과 엔지니어의 역할 변화에 대한 고찰을 담았습니다.
Designing Data-Intensive Applications - (2) Defining NonFunctional Requirements
이 챕터는 데이터 중심 애플리케이션의 핵심인 세 가지 비기능적 요구사항(신뢰성, 확장성, 유지보수성)을 정의합니다. 트위터의 타임라인 구축 사례를 통해 읽기/쓰기 시점의 부하 분산 전략(Fan-out)과 트레이드오프를 살펴보고, p99.9와 같은 꼬리 지연 시간(Tail Latency) 관리의 중요성을 강조합니다. 최종적으로는 복잡성을 제어하는 추상화와 변화에 유연한 설계가 장기적인 시스템 운영에 어떤 영향을 미치는지 다룹니다. This chapter defines the three pillars of data-intensive applications: Reliability, Scalability, and Maintainability. Through the case study of X (Twitter) home timelines, it explores the trade-offs of fan-out strategies between write and read paths. It also emphasizes the importance of managing tail latencies (p99.9) and explains how abstraction and evolvability are crucial for long-term system health and managing accidental complexity.
