Notice
Recent Posts
Recent Comments
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Archives
Today
Total
관리 메뉴

SYDev

[Spark The Definitive Guide] Part 1. 빅데이터와 스파크 간단히 살펴보기 본문

KHUDA 5th/Data Engineering

[Spark The Definitive Guide] Part 1. 빅데이터와 스파크 간단히 살펴보기

시데브 2024. 3. 7. 19:42
데이터 엔지니어링 심화트랙 1주차 정리 내용

 

Chapter 1. 아파치 스파크란

  • 아파치 스파크: 통합 컴퓨팅 엔진이며 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합
  • 파이썬, 자바, 스칼라, R 지원

1.5.2  스파크 대화형 콘솔 실행하기

가이드에 따라 파일 설치하고 나면 다음과 같은 폴더를 받을 수 있다.

 

 

해당 디렉토리로 가서 ./bin/pyspark 실행하면 다음과 같이 SparkSession 객체가 출력된다.

 

>> 파일이 실행 안 돼서 한참동안 헤매다가 JAVA 설치하니까 바로 실행성공..

 

Chapter 2. 스파크 간단히 살펴보기

DataFreame과 SQL을 사용해 클러스터, 스파크 애플리케이션 그리고 구조적 API를 살펴본다. 이 과정에서 스파크의 핵심 용어와 개념을 접하고 사용법을 익혀보자.

 

2.1. 스파크의 기본 아키텍쳐

  • 한 대의 컴퓨터로는 대규모 정보를 연산할 만한 자원이나 성능을 가지지 못한다. 가능하다고 해도, 너무 많은 시간을 소요한다.
  • 컴퓨터 클러스터: 여러 컴퓨터 자원을 모아 하나의 컴퓨터처럼 사용할 수 있게 만든다. 허나, 클러스터를 구성하는 것만으로는 부족하여 클러스터에서 작업을 조율할 수 있는 프레임워크가 필요하다.
  • 스파크가 바로 그런 역할을 하는 프레임워크이다. 스파크는 데이터 처리 작업을 관리하고 조율한다.

2.1.1. 스파크 애플리케이션

  • 스파크 애플리케이션은 드라이버(driver) 프로세스와 다수의 익스큐터(executor) 프로세스로 구성된다.
  • 드라이버 프로세스 - 클러스터 노드 중 하나에서 실행되며 main() 함수를 실행한다. 이는 스파크 애플리케이션 정보의 유지 관리, 사용자 프로그램이나 입력에 대한 응답, 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포 그리고 스케줄링 역할을 수행하기에 필수적. 드라이버 프로세스는 스파크 애플리케이션의 심장과 같은 존재로서, 애플리케이션의 수명 주기 동안 관련 정보를 모두 유지
  • 익스큐터 - 드라이버 프로세스가 할당한 작업을 수행. 드라이버가 할당한 코드를 실행하고, 진행 상황을 다시 드라이버 노드에 보고하는 두 가지 역할 수행.

 

  • 스파크는 사용 가능한 자원을 파악하기 위해 클러스터 매니저를 사용한다.
  • 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행할 책임이 있다.

 

2.2. 스파크의 다양한 언어 API

 스파크의 언어 API를 이용하면, 다양한 프로그래밍 언어로 스파크 코드를 실행할 수 있다. 

 

  • 대부분 예제들이 스칼라, 자바로 구성되어 있지만, 파이썬은 스칼라가 지원하는 거의 모든 구조를 지원한다. 본인은 파이썬을 이용해서 학습을 진행할 예정..!

 

 

2.3. 스파크 API 

 스파크는 기본적으로 두 가지 API를 제공하는데, 저수준의 비구조적(unstructured) API와 고수준의 구조적(structured) API이다.

 

2.5. SparkSession

  • SparkSession이라 불리는 드라이브 프로세서로 스파크 애플리케이션을 제어, SparkSessioon 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행
  • 하나의 SparkSession은 하나의 스파크 애플리케이션에 대응
>>> spark
<pyspark.sql.session.SparkSession object at 0x10424bd70>
>>> myRange = spark.range(1000).toDF("number")

-> 한 개의 column과 1000개의 row로 구성된 DataFrame 생성

-> 각 로우에는 0~999의 값 할당, 해당 숫자들은 분산 컬렉션을 나타냄

 

 클러스터에서 코드 예제를 실행하면 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당됨

 

2.6. DataFrame

  • 스키마(schema): 컬럼과 컬럼의 타입을 정의한 목록
  • 스프레드 시트 vs DataFrame -> 스프레드시트는 한 대의 컴퓨터에 있지만, 스파크 DataFrame은 수천 대의 컴퓨터에 분산

 

2.6.1. 파티션

  • 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 청크 단위로 데이터를 분할한다.
  • 파티션: 클러스터의 물리적 머신에 존재하는 로우의 집합
  • if 파티션이 하나라면, 스파크에 수천 개의 익스큐터가 있더라도 병렬성은 1
  • 수백 개의 파티션이 있더라도, 익스큐터가 하나라면 병렬성은 1

 

2.7. 트랜스포메이션

  • 스파크의 핵심 데이터 구조는 불변성(immutable)을 가진다. -> 한 번 생성하면 변경 X
  • DataFrame을 변경하려면 원하는 변경 방법을 스파크에 알려야 하는데, 이때 사용하는 명령을 트랜스포메이션이라 부른다.
>>> divisBy2 = myRange.where("number % 2 = 0")

-> 트랜스포메이션 예시

-> 추상적 트랜스포메이션만 지정한 상태이기 대문에 액션(후에 학습)을 호출하지 않으면 스파크는 실제 트랜스포메이션 수행X

 

  • 트랜스포메이션은 스파크에서 비즈니스 로직을 표현하는 핵심 개념이다.
  • 트랜스포메이션에는 두 가지 유형이 있는데, 하나는 좁은 의존성(narrow dependency)이고, 다른 하나는 넓은 의존성(wide dependency)

 

  • 좁은 의존성을 가진 트랜스포메이션 -> 각 입력 파티션이 하나의 출력 파티션에만 영향을 미친다.
  • 이전 코드 예제의 where 구문은 좁은 의존성을 가짐
  • 좁은 트랜스포메이션을 사용하면, 스파크에서 pipelining을 자동으로 수행
  • DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 발생

 

  • 넓은 의존성을 가진 트랜스포메이션 -> 하나의 입력 파티션이 여러 출력 파티션에 영향을 미친다.
  • 스파크가 클러스터에서 파티션을 교환하는 shuffle
  • 스파크는 셔플의 결과를 디스크에 저장

2.7.1. 지연 연산

  • 지연 연산(lazy evaluation): 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
  • 스파크는 특정 연산 명령이 내려진 즉시 데이터를 수정 X -> 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성
  • 코드를 실행하는 마지막까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일
  • 이 과정을 거치며 전체 데이터  흐름을 최적화
  • DataFrame의 predicate pushdown이 한 예시

 

2.8. 액션

  • 트랜스포메이션을 사용해 논리적 실행 계획을 세운 후, 실제 연산을 수행하려면 액션 명령을 내려야 한다.
  • 액션: 일련의 트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령
>>> divisBy2.count()
500

 

-> DataFrame의 전체 레코드 수를 반환

-> count 외에도 다음 세 가지 유형의 액션이 존재

 

  1. 콘솔에서 데이터를 보는 액션
  2. 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
  3. 출력 데이터소스에 저장하는 액션
  • 액션을 지정하면 스파크 잡이 시작된다. 
  • 스파크 잡필터(narrow transformation)를 수행한 후 파티션별로 레코드 수를 카운트(wide transformation)한다.
  • 이후 각 언어에 적합한 네이티브 객체에 결과를 수집
  • 스파크 UI로 클러스터에서 실행 중인 스파크 잡을 모니터링 가능

 

2.9. 스파크 UI

  • 스파크 UI는 스파크 잡의 진행 상황을 모니터링할 때 사용
  • 드라이버 노드의  4040 포트로 접속 가능
  • 로컬 스파크 UI 주소 -> http://localhost:4040
  • 스파크 잡을 튜닝하고 디버깅할 때 유용

 

 

2.10. 종합 예제

https://bit.ly/2yw2fCx -> 에서 data 받아서 다음 코드 실행

 

>>> flightData2015 = spark\
... .read\
... .option("inferSchema", "true")\
... .option("header", "true")\
... .csv("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/flight-data/csv/2015-summary.csv")

-> 데이터를 읽는 시점에 스키마를 엄격하게 지정하는 옵션을 사용

-> 데이터 read

-> inferenceSchema -> 스키마 추론을 통해 스키마 정보를 알아냄

-> 파일의 첫 로우를 헤더로 지정

 

>>> flightData2015.take(3)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

-> head 명령

 

 

>>> flightData2015.sort("count").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Def..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

-> DataFrame 객체에 explain 메서드를 호출하여 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있다.

-> 실행 계획은 위에서 아래 방향으로 읽으며, 최종 결과는 가장 위에, 데이터소스는 가장 아래에 있다.

--> 위 예제에서 각 줄의 첫 번째 키워드 (Sort, Exchange, FileScan)에 주목해보자.

---> Sort 메서드가 넓은 트랜스포메이션으로 동작

 

>>> spark.conf.set("spark.sql.shuffle.partitions", "5")
>>> flightData2015.sort("count").take(2)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1), Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

-> 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성 -> 이 값을 5로 설정해 셔플의 출력 파티션 수를 줄인다.

 

 

2.10.1. DataFrame과 SQL

  • 사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현 -> 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획(explain 메서드를 호출해 확인)으로 컴파일 
  • 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰(임시 테이블)로 등록한 후 SQL 쿼리를 사용할 수 있다.
>>> flightData2015.createOrReplaceTempView("flight_data_2015")

-> createOrReplaceTempView 메서드를 호출하여 모든 DataFrame을 테이블이나 뷰로 만들 수 있다.

-> 이제 SQL로 데이터를 조회할 수 있다.

 

>>> sqlWay = spark.sql("""
... SELECT DEST_COUNTRY_NAME, count(1)
... FROM flight_data_2015
... GROUP BY DEST_COUNTRY_NAME
... """)
>>> dataFrameWay = flightData2015\
... .groupBy("DEST_COUNTRY_NAME")\
... .count()
>>> sqlWay.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 10), ENSURE_REQUIREMENTS, [plan_id=61]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Def..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


>>> dataFrameWay.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 10), ENSURE_REQUIREMENTS, [plan_id=74]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Def..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>

-> 두 가지 실행 계획은 동일한 기본 실행 계획으로 컴파일됨

 

>>> spark.sql("SELECT max(count) from flight_data_2015").take(1)
[Row(max(count)=370002)]
>>> from pyspark.sql.functions import max
>>> flightData2015.select(max("count")).take(1)
[Row(max(count)=370002)]

-> max 함수는 필터링을 수행해 단일 로우를 결과로 반환하는 트랜스포메이션이다.

 

>>> spark.sql("SELECT max(count) from flight_data_2015").take(1)
[Row(max(count)=370002)]
>>> from pyspark.sql.functions import max
>>> flightData2015.select(max("count")).take(1)
[Row(max(count)=370002)]
>>> maxSql = spark.sql("""
... SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
... FROM flight_data_2015
... GROUP BY DEST_COUNTRY_NAME
... ORDER BY sum(count) DESC
... LIMIT 5
... """)
>>> maxSql.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

-> 상위 5개의 도착 국가 탐색 코드

 

>>> flightData2015\
... .groupBy("DEST_COUNTRY_NAME")\
... .sum("count")\
... .withColumnRenamed("sum(count)", "destination_total")\
... .sort(desc("destination_total"))\
... .limit(5)\
... .show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

-> 의미는 같고 구현과 정렬 방식이 조금 다른 DataFrame 구문

 

  • 실행 계획은 트랜스포메이션의 지향성 비순환 그래프(directed acyclic graph, DAG)이며 액션이 호출되면 결과를 만들어낸다.
  • 지향성 비순환 그래프의 각 단계는 불변성을 가진 신규 DataFrame을 생성

  1. 첫 번째 단계에서는 데이터를 읽는다. 스파크는 해당 DataFrame이나 자신의 원본 DataFrame에 액션이 호출되기 전까지 데이터를 읽지 않는다.
  2. 두 번째 단계에서는 데이터를 그룹화한다. groupBy 메서드가 호출되면 최종적으로 그룹화된 DataFrame을 지칭하는 이름을 가진 RelationalGroupDataset을 반환한다. 기본적으로 키 or 키셋을 기준으로 그룹 생성하고 각 키에 대한 집계 수행
  3. 세 번째 단계에서는 집계 유형을 지정하기 위해 컬럼 표현식이나 컬럼명을 인수로 사용하는 sum 메서드를 사용한다. sum 메서드는 새로운 스키마 정보를 가지는 별도의 DataFrame을 다시 생성
  4. 네 번째 단계에서는 컬럼명을 변경한다. withColumnRenamed 메서드에 원본 컬럼명과 신규 컬럼명을 인수로 지정
  5. 다섯 번째 단계에서는 데이터를 정렬한다. 결과 DataFrame의 첫 번째 로우를 확인해보면 destination_total 컬럼에서 가장 큰 값을 지님. 역순으로 정렬하기 위해 dexc 함수를 임포트
  6. 여섯 번째 단계에서는 limit 메서드로 반환 결과의 수를 제한한다. 결과 DataFrame의 전체 데이터 대신 상위 5개 로우를 반환
  7. 마지막 단계에서는 액션을 수행한다. 이 단계에서야 DataFrame의 결과를 모으는 프로세스를 시작한다. 처리가 끝나면 코드를 작성한 프로그래밍 언어에 맞는 리스트나 배열을 반환
>>> flightData2015\
... .groupBy("DEST_COUNTRY_NAME")\
... .sum("count")\
... .withColumnRenamed("sum(count)", "destination_total")\
... .sort(desc("destination_total"))\
... .limit(5)\
... .explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#150L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#150L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(count#19)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 10), ENSURE_REQUIREMENTS, [plan_id=281]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(count#19)])
            +- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Def..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>

-> 예제의 결과와 다소 다른데 이유를 밝혀내지 못함..

 

Chapter 3. 스파크 기능 둘러보기  

  • 스파크는 기본 요소인 저수준 API구조적 API 그리고 추가 기능을 제공하는 일련의 표준 라이브러리로 구성되어있다.

 

3.1. 운영용 애플리케이션 실행하기

  • spark-submit대화형 셸에서 개발한 프로그램 -> 운영용 애플리케이션으로 쉽게 전환 가능
  • spark-submit은 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할
  • spark-submit 명령에 애플리케이션 실행에 필요한 자원, 실행 방식, 다양한 옵션 지정 가능
sangyeong-park@bagsang-yeong-ui-MacBookPro spark-3.5.1-bin-hadoop3 % ./bin/spark-submit \
> --master local \
> ./examples/src/main/python/pi.py 10

-> spark-submit 명령 중 master 옵션의 인숫값을 변경하면 스파크가 지원하는 스파크 스탠드얼론, 메소스 그리고 YARN 클러스터 매니저에서 동일한 애플리케이션 실행 가능 

 

3.2. Dataset: 타입 안정성을 제공하는 구조적 API

  • Dataset: 자바와 스칼라의 정적 데이터 타입에 맞는 코드, 즉 정적 타입 코드(statically typed code)를 지원하기 위해 고안된 구조적 API
  • 파이썬 이용할 예정이므로 Skip..!

 

3.3. 구조적 스트리밍

  • 구조적 스트리밍: 스파크 2.2 버전에서 안정화(production-ready)된 스트림 처리용 고수준 API이다.
  • 구조적 스트리밍을 -> 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행, 지연 시간 줄이고 증분 처리 가능
  • 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고, 값을 빠르게 얻을 수 있다는 장점
  • 프로토타입을 배치 잡으로 개발한 다음 스트리밍 잡으로 변환할 수 있어 개념 잡기가 편해짐

 

예제

  • 소매(retail) 데이터셋 사용 -> 특정 날짜와 시간 정보 포함
  • 예제 데이터셋 중 하루치 데이터를 나타내는 by-day 디렉터리 파일 사용
  • 여러 프로세스에서 데이터가 꾸준히 생성 -> 소매점에서 생성된 데이터가 구조적 스트리밍 잡이 읽을 수 있는 저장소로 전송되고 있다 가정

데이터 형태

 

>>> staticDataFrame = spark.read.format("csv")\
... .option("header", "true")\
... .option("inferSchema", "true")\
... .load("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")
>>> staticDataFrame.createOrReplaceTempView("retail_data")
>>> staticSchema = staticDataFrame.schema

 

>>> staticDataFrame\
... .selectExpr(
... "CustomerId",
... "(UnitPrice * Quantity) as total_cost",
... "invoiceDate")\
... .groupBy(
... col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
... .sum("total_cost")\
... .show(5)
Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=131072Kb used=30268Kb max_used=30268Kb free=100803Kb
 bounds [0x0000000104928000, 0x00000001066e8000, 0x000000010c928000]
 total_blobs=12516 nmethods=11575 adapters=853
 compilation: disabled (not enough contiguous free space left)
+----------+--------------------+-----------------+                             
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-05 09:00...|            -37.6|
|   14126.0|{2011-11-29 09:00...|643.6300000000001|
|   13500.0|{2011-11-16 09:00...|497.9700000000001|
|   17160.0|{2011-11-08 09:00...|516.8499999999999|
|   15608.0|{2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows

 

>>> streamingDataFrame = spark.readStream\
... .schema(staticSchema)\
... .option("maxFilesPerTrigger", 1)\
... .format("csv")\
... .option("header", "true")\
... .load("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")
>>> streamingDataFrame.isStreaming
True

-> DataFrame이 스트리밍 유형인지 확인 -> true

 

>>> purchaseByCustomerPerHour = streamingDataFrame\
... .selectExpr(
... "CustomerId",
... "(UnitPrice * Quantity) as total_cost",
... "InvoiceDate")\
... .groupBy(
... col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
... .sum("total_cost")

-> 총 판매 금액 계산

-> lazy operation이므로, 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야 한다.

 

  • 스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로, count 메서드와 같은 일반적 정적 액션과는 차이점이 있다.
  • 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장
  • 이번 예제의 경우 파일마다 트리거를 실행
  • 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블 갱신 -> 언제나 가장 큰 값 얻을 수 있음
>>> purchaseByCustomerPerHour.writeStream\
... .format("memory")\
... .queryName("customer_purchases")\
... .outputMode("complete")\
... .start()

-> 인메모리 테이블에 저장

-> 인메모리에 저장될 테이블명 지정

-> 모든 카운트 수행 결과를 테이블에 저장

 

>>> spark.sql("""
... SELECT *
... FROM customer_purchases
... ORDER BY `sum(total_cost)` DESC
... """)\
                                                                                
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 09:00...|          71601.44|
|      NULL|{2011-11-14 09:00...|          55316.08|
|      NULL|{2011-11-07 09:00...|          42939.17|
|      NULL|{2011-03-29 09:00...| 33521.39999999998|
|      NULL|{2011-12-08 09:00...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows

 

3.4. 머신러닝과 고급 분석

  • 스파크에서는 머신러닝 알고리즘 라이브러리 MLib을 사용해 대규모 머신러닝을 수행할 수 있다.
  • 대용량 데이터를 대상으로 preprocessing, munging, model training, prediction 가능
  • classification, regression, clustering, deep learning에 이르기까지 머신러닝과 관련된 정교한 API 제공
>>> staticDataFrame.printSchema()
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

 

>>> from pyspark.sql.functions import date_format, col
>>> preppedDataFrame = staticDataFrame\
... .na.fill(0)\
... .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
... .coalesce(5)	//널값 변환하는 역할

 

>>> trainDataFrame = preppedDataFrame\
... .where("InvoiceDate < '2011-07-01'")
>>> testDataFrame = preppedDataFrame\
... .where("InvoiceDate >= '2011-07-01'")

-> trainDataset, testDataset 생성

 

>>> trainDataFrame.count()
245903
>>> testDataFrame.count()
296006

-> 대략 절반으로 나눠진 데이터셋

 

>>> from pyspark.ml.feature import StringIndexer
>>> indexer = StringIndexer()\
... .setInputCol("day_of_week")\
... .setOutputCol("day_of_week_index")

-> 요일(day of week)을 수치형으로 반환 ex) 토요일을 6으로, 월요일을 1로

-> 토요일이 월요일보다 크다는 것을 암묵적으로 의미하기 때문에(수치로 표현되기 때문), 문제점을 보완하기 위해서는 OneHotEncoder를 사용해 각 값을 자체 컬럼으로 인코딩해야 한다. -> 해당 요일인지 아닌지 Boolean type으로 구별 가능

 

>>> from pyspark.ml.feature import OneHotEncoder
>>> encoder = OneHotEncoder()\
... .setInputCol("day_of_week_index")\
... .setOutputCol("day_of_week_encoded")

-> 원핫인코딩처리

 

>>> vectorAssembler = VectorAssembler()\
... .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
... .setOutputCol("features")

-> 세 가지 핵심 특징인 UnitPrice, Quantity, day_of_week_encoded를 가지고 있다.

 

>>> from pyspark.ml import Pipeline
>>> transformationPipeline = Pipeline()\
... .setStages([indexer, encoder, vectorAssembler])

-> 나중에 입력값으로 들어올 데이터가 같은 프로세스를 거쳐 변환되도록 파이프라인을 설정

 

>>> fittedPipeline = transformationPipeline.fit(trainDataFrame)
>>> transformedTraining = fittedPipeline.transform(trainDataFrame)

-> 모델 학습에 사용할 파이프라인 마련

-> 파이프라인 구성 과정에서 데이터 캐싱 과정을 제외된 상태(4부에서 학습)

 

  • 동일한 트랜스포메이션을 계속 반복할 수 없음 -> 대신 모델 일부 하이퍼파라미터 튜닝값을 적용
  • 캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장 -> 전체 파이프라인을 재실행하는 것보다 훨씬 빠르게 반복적으로 데이터셋에 접근
>>> kmeans = KMeans()\
... .setK(20)\
... .setSeed(1)

-> KMeans 클래스를 임포트하고 인스턴스 생성

-> 교재에는 setSeed(1L)로 돼있는데, invalid decimal literal 에러가 발생해서 L을 빼버림

 

  • 스파크에서 머신러닝 모델을 학습시키는 과정은 크게 두 단계로 진행된다.
  • 첫 번재 단계는 아직 학습되지 않은 모델을 초기화고, 두 번째 단계는 해당 모델을 학습시킨다.
  • MLlib의 DataFrame API에서 제공하는 모든 알고리즘은 항상 두 가지 유형으로 구성
  • 학습 전 알고리즘 명칭: Algorithm -> KMeans
  • 학습 후 알고리즘 명칭: AlgorithmModel -> KMeansModel
>>> kmModel = kmeans.fit(transformedTraining)
24/03/08 19:51:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=131072Kb used=42643Kb max_used=42980Kb free=88428Kb
 bounds [0x0000000106c38000, 0x0000000109668000, 0x000000010ec38000]
 total_blobs=16448 nmethods=15306 adapters=1053
 compilation: disabled (not enough contiguous free space left)

-> 모델 학습 완료

 

>>> from pyspark.ml.evaluation import ClusteringEvaluator
>>> predictionsTrain = kmModel.transform(transformedTraining)
>>> evaluator = ClusteringEvaluator()
>>> transformedTrain =  evaluator.evaluate(predictionsTrain)
>>> print("transformedTrain accuracy = " + str(transformedTrain))
transformedTrain accuracy = 0.648609430141304

-> 교재에서는 computeCost를 사용하지만 spark 3.0.0에서 해당 함수가 사라졌으므로, 이를 대체할 ClusteringEvaluator를 이용하여 score를 계산함 (참고자료: https://kils-log-of-develop.tistory.com/739)

-> score의 상태가..?

-> 전처리과정을 추가하고, 하이퍼 파라미터값을 튜닝하면 더 좋은 모델을 만들 수 있다.

 

3.5. 저수준 API

  • 스파크는 RDD(Resilient Distributed Dataset, 탄력적 분산 데이터셋 - > 다수의 서버에 걸쳐 분산 방식으로 저장된 요소들의 집합(https://6mini.github.io/data%20engineering/2021/12/12/rdd/))를 통해 자바와 파이썬 객체를 다루는 데 필요한 다양한 기본 기능(저수준 API)을 제공
  • 스파크의 거의 모든 기능은 RDD 기반, 효율적인 분산 처리를 위해 저수준 명령으로 컴파일
  • 원시 데이터를 읽거나 다루는 용도로 RDD를 사용할 수 있지만 -> 대부분 구조적 API를 사용하는 것이 좋음
  • RDD를 이용해 파티션과 같은 물리적 실행 특성을 결정할 수 있음 -> DataFrame보다 더 세밀한 제어를 할 수 있다.
  • 드라이버 시스템의 메모리에 저장된 원시 데이터를 parallelize하는 데 RDD를 사용 가능
>>> from pyspark.sql import Row
>>> spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
DataFrame[_1: bigint]

-> 간단한 숫자를 이용해 병렬화해 RDD를 생성하는 예제

-> 다른 DataFrame과 함께 사용할 수 있도록 DataFrame으로 변환

 

  • 최신 버전의 스파크에서는 기본적으로 RDD를 사용하지 않지만, 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 한다면 RDD를 사용

 

3.7. 스파크의 에코시스템과 패키지

  • spark-packages.org -> 스파크 패키지 목록 공유 저장소

 

3.8. 정리

  • 스파크를 비즈니스와 기술적 문제 해결에 적용할 수 있는 다양한 방법을 학습함
  • 스파크의 단순하고 강력한 프로그래밍 모델은 다양한 문제를 해결하는 데 손쉽게 적용할 수 있다.

참고자료

  • "스파크 완벽 가이드" , 빌 체임버스 , 마테이 자하리아 저자(글) · 우성한 , 이영호 , 강재원 번역