일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Machine Learning
- 해커톤
- 지도학습
- LG
- LG Aimers
- 회귀
- 티스토리챌린지
- 분류
- LG Aimers 4th
- supervised learning
- OpenAI
- deep learning
- 딥러닝
- AI
- 머신러닝
- regression
- LLM
- GPT-4
- gpt
- ChatGPT
- PCA
- Classification
- 오블완
- Today
- Total
SYDev
[Spark The Definitive Guide] Part 6. 고급 분석과 머신러닝 본문
데이터 엔지니어링 심화트랙 4주차 정리 내용
6부에서는 스파크에서 제공하는 고급 분석 및 머신러닝을 위한 다양한 API를 다룰 예정
Chapter 24. 고급 분석과 머신러닝 개요
6부에서 다룰 스파크 고급 분석 도구는 다음과 같다.
- 데이터 전처리: 데이터 정제 및 피처 엔지니어링
- 지도 학습(supervised learning)
- 비지도 학습(unsupervised learning)
- 추천 엔진(recommendation learning)
- 그래프 분석(graph analytics)
- 딥러닝(deep learning)
24.1. 고급 분석
- 고급 분석: 데이터 기반의 인사이트를 도출 -> 핵심 문제 해결 or 예측 or 추천을 하기 위한 기술
머신러닝에서 일반적으로 활용되는 작업
- features 기반 label 예측 -> 분류/회귀 문제를 포함한 지도 학습
- 사용자의 과거 행동 기반 제품 제안 -> 추천 엔진
- 군집 분석, 이상징후 탐지, 토픽 모델링(topic modeling)과 같이 데이터 구조를 파악 -> 비지도 학습
- 소셜 네트워크 상에서 유의미한 패턴 발견 -> 그래프 분석
24.1.1. 지도 학습
- 지도 학습의 목표: label(혹은 종속 변수)을 포함하는 과거 데이터를 사용해 모델 학습 -> 데이터 포인트(혹은 데이터)의 다양한 특징을 기반으로 해당 레이블값을 예측
- 일반적으로 경사 하강법(gradient descent)과 같은 반복적 최적화 알고리즘으로 진행됨
- 기본 모델로 시작 -> 반복적 학습 과정에서 내부 parameter를 조정하면서 단계적으로 모델 개선 -> 최종 학습 모델 선정
- 예측하고자 하는 변수의 타입에 따라 분류/회귀로 구분
분류
- 분류: 범주형(불연속적이고 유한한 값의 집합) 종속변수를 예측하는 알고리즘을 학습시키는 행위
- 이진 분류(binary classification): 주어진 항목을 예측할 때 두 그룹 중 하나에 속한다고 예측하는 분류
- ex) 스팸메일의 분류
- 다중 클래스 분류(multiclass classification): 항목을 세 가지 이상의 범주로 분류하는 경우
- ex) 이메일을 4가지 범주(스팸, 개인, 업무, 기타)로 구분
- 분류의 예시 -> 질병 예측, 이미지 분류, 고객 이탈 예측, 구매 여부 예측 ...
- 분류 API는 26장에서 다룰 예정
회귀
- 분류에서의 종속 변수는 불연속 값들의 집합 -> 회귀 분석에서는 연속형 변수(실수)를 예측
- ex) 판매량 예측, 신장(키) 예측, 관객 수 예측
- 회귀 API는 27장에서 다룰 예정
24.1.2. 추천
- 제품이나 아이템에 대한 명시적인 선호도(등급 이용) 혹은 암시적인 선호도(관찰된 행동 이용)를 연구 -> 알고리즘은 사용자 또는 아이템 간의 유사성을 도출 -> 사용자가 선호할 만한 추천 아이템 도출
- 특정 사용자와 유사한 사용자가 선호하는 상품, 특정 사용자가 이미 구매한 상품과 비슷한 상품 추천
- 추천은 스파크의 대표적인 활용 사례이자 빅데이터에 최적화된 주제
- ex) 영화 추천, 상품 추천
- 추천 API는 28장에서 다룰 예정
24.1.3. 비지도 학습
- 주어진 데이터셋에서 특정 패턴을 찾거나, 숨겨진 구조적 특징을 발견하는 행위
- 지도 학습과 달리 예측 대상이 되는 종속변수(레이블)가 필요 없음
- ex) 이상징후 탐지, 사용자 세분화, 토픽 모델링
24.1.4. 그래프 분석
- 객체를 가리키는 vertex(정점)와 해당 객체 간의 관계를 나타내는 edge를 지정하는 구조에 대한 연구
- ex) vertex: 사람, 상품, ... edge: 구매
- 그래프는 관계에 관한 모든 것을 의미하므로, 관계를 지정하는 모든 것이 그래프 분석의 사례가 될 수 있다.
- ex) 사기거래 예측, 이상징후 탐지, 네트워크 특성 분류, 웹 페이지 추천
-> GraphFrame은 그 어떤 Graph Database보다 훨씬 더 큰 규모의 작업으로 확장할 수 있다.
-> 하지만, 스파크 자체는 데이터베이스가 아니기 때문에, 트랜잭션 처리 및 검색은 지원하지 않는다.
24.1.5. 고급 분석 프로세스
일반적인 머신러닝의 진행 단계
- 분석 주제와 관련된 데이터 수집
- 데이터 파악을 위해 데이터를 정제하고 검토
- 알고리즘이 데이터를 잘 인식하고 활용할 수 있도록 feature engineering 수행 (ex: 데이터를 수치형 벡터로 변환)
- 후보 모델을 생성하는 하나 이상의 알고리즘을 학습시키기 위해 전체 데이터의 일부를 학습 데이터셋으로 사용
- test dataset에 생성된 모델을 적용 -> 그 결과를 객관적으로 측정하고 분석 수행 시 정해 놓은 성공 기준과 비교 -> 모델 최종 평가
- 앞의 과정을 거치면서 확보한 통찰력 및 모델을 사용하여 예측을 수행하거나 이상 현상을 감지
데이터 수집
- 스파크는 다양한 데이터소스를 불러들일 수 있고, 크고 작은 데이터를 처리할 수 있기에 데이터 수집에 적절한 도구이다.
데이터 정제
- 탐색적 데이터 분석(exploratory data analysis, EDA): 대화형 쿼리 및 시각화 방법 등을 사용하여 데이터의 분포, 상관관계 및 기타 세부 현황을 파악하는 과정 -> 수집 서버에 잘못 기록되어 삭제해야 하는 값, 누락된 값 파악
- 스파크 함수는 손쉽게 데이터를 정제하고 파악하는 방법 제공
피처 엔지니어링
- ML머신러닝 알고리즘에 적용 가능한 형식으로 데이터를 변환하는 과정
- 데이터 정규화, 다른 변수들과의 상호작용을 나타내는 새로운 변수 추가, 범주형 변수 조작 및 머신러닝 모델에서 인식할 수 있도록 적절한 형식으로의 변환 등 다양한 작업을 포함
- 스파크에서 제공하는 라이브러리인 MLlib에서는 일반적으로 실젯값의 형태와 관계없이 모든 변수가 실수(double)형 벡터로 입력되어야 한다.
- 스파크는 다양한 머신러닝 통계 기법을 사용하여 데이터를 다루는 데 필요한 주요 기능을 제공
모델 학습
- 과거 정보 데이터셋 + 분석 목적 -> 입력에 맞게 적합한 출력을 예측하는 모델 학습
모델 튜닝 및 평가
- 데이터셋을 분할하여 사용 -> train dataset, validation dataset, test dataset
24.2. 스파크의 고급 분석 툴킷
스파크의 고급 분석을 수행하기 위한 가장 기본이 되는 패키지는 MLlib -> 머신러닝 파이프라인을 구축하기 위한 인터페이스 제공
24.2.1. MLlib
- MLlib: 스파크에 내장된 패키지로서 데이터 수집과 정제, 특징 추출과 선택, 대규모 데이터를 대상으로 한 지도 및 비지도 학습 머신러닝 모델의 학습과 튜닝, 그리고 이러한 모델을 운영 환경에서 사용할 수 있도록 도와주는 인터페이스 제공
- 단일 머신을 기반으로 머신러닝을 수행하는 도구는 다양하고 사용자가 선택할 수 있는 좋은 옵션이 많으나, 데이터 크기나 처리 시간 면에서 분명한 한계 존재
- 단일 머신 기반의 도구와 MLlib은 상호보완적
- 스파크로 데이터를 전처리하고 특징 생성 -> 대량의 데이터로부터 train, test dataset 생성에 걸리는 시간 단축 -> 단일 머신 기반의 학습 라이브러리를 활용해 주어진 dataset 학습
- 입력 데이터나 모델 크기가 단일 머신에 올려놓기 너무 어렵거나 불편할 경우 -> 스파크를 사용해 분산 처리 기반의 머신러닝 간단히 수행
- 유의 사항: 학습 및 데이터 처리가 간단해지는 반면, 학습이 완료된 모델을 배포할 때 어느 정도의 복잡성 존재 -> 스파크는 모델 자체적으로 대기 시간 짧은(실시간성의) 예측 지원X -> 타 서비스 시스템이나 자체 제작 애플리케이션으로 모델을 보내 이를 수행
24.3. 고수준 MLlib의 개념
MLlib에는 변환자(transformer), 추정자(estimator), 평가기(evaluator), 파이프라인(pipeline)과 같은 몇 가지 기본적 '구조적' 유형 존재
변환자
- 변환자(transformer): 원시 데이터를 다양한 방식으로 변환하는 함수.
- 새로운 상호작용 변수 생성, 컬럼 정규화, 모델에 입력하기 위해 타입 변경(Int -> Double), ..
- 데이터 전처리 및 피처 엔지니어링 과정에 사용
추정자
- 추정자(estimator): 데이터를 초기화하는 일종의 변환자 or 모델을 학습시키기 위해 사용하는 알고리즘
평가기
- 평가기(evaluator): 주어진 모델의 성능이 수신자 조작 특성(receiver operating characteristic, ROC) 곡선처럼 지정한 기준에 따라 어떻게 작동하는지 볼 수 있게 해줌.
- 평가기를 사용하여 테스트한 모델 중에 가장 우수한 모델 선택
-> 분석 정확도의 시각적 지수, 곡선이 참조선보다 위에 있을수록 검정이 더 정확함
https://angeloyeo.github.io/2020/08/05/ROC.html
고수준에서 차례로 변환, 추정 및 평가를 하나씩 지정할 수도 있지만, 파이프라인의 단계로 지정하는 것이 더 간편할 수 있다.
저수준 데이터 타입
- MLlib에서 작업해야 할 수도 있는 몇 가지 저수준 데이터 타입이 존재 -> vector가 가장 일반적인 타입
- 여기서 저수준 데이터는 데이터를 처리하기 위한 데이터 구조를 의미하는 것으로 보임.
- 기본 데이터 타입(정수, 실수, 문자열, ..), 배열, 리스트, 맵, ...
- 이런 저수준 데이터를 조합 -> 고수준의 추상화된 데이터(Dataset, Dataframe) 생성
- pyspark.mllib.linag.Vecotors 라이브러리는 고밀도(dense)와 희소(sparse)의 두 유형 로컬 벡터를 지원한다.
- dense vector: 모든 값을 저장
- sparse vector: 공간 절약을 위해 0이 아닌 값만 저장, 0이 아닌 값이 전체 사이즈의 10% 이하면 메모리 사용량이나 속도 측면에서 모두 저밀도 벡터가 선호된다.
>>> from pyspark.ml.linalg import Vectors
>>> denseVec = Vectors.dense(1.0, 2.0, 3.0)
>>> size = 3
>>> idx = [1, 2]
>>> values = [2.0, 3.0]
>>> sparseVec = Vectors.sparse(size, idx, values)
-> denseVec과 달리 sparseVec은 벡터 전체의 사이즈, 0이 아닌 벡터의 index와 value를 제시해줘야 함
24.4. MLlib 실제로 사용하기
각 구성 요소를 실행하는 간단한 파이프라인 생성 예제
>>> df = spark.read.json("/Users/sangyeong_park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/simple-ml")
>>> df.orderBy("value2").show()
-> 두 개의 값(good, bad)으로 구성된 범주형 레이블(lab) 한 개와, 범주형 변수(color) 한 개, 두 개의 수치형 변수(value1, value2)로 구성
-> lab는 실제 고객의 건강 상태, 수치형 변수(value1, 2)는 웹 서비스 내에서의 여러 활동을 숫자로 표현한 척도, color는 고객 서비스 담당자가 작성한 몇 가지 범주로 구성된 건강 등급이라 가정
24.4.1 변환자를 사용해서 피처 엔지니어링 수행
- 변환자 -> 여러 방식으로 현재 column을 조작
- MLlib을 사용할 때, 스파크에서 제공하는 머신러닝 알고리즘의 모든 입력변수 -> Double 타입 or Vector[Double] 타입으로 구성
- RFormula: 머신러닝에서 데이터 변환을 지정하기 위한 선언적 언어, RFormula의 연산자는 다음과 같다.
- '~': 함수에서 타깃(target)과 항(term)을 분리
- '+': 연결 기호. '+0'은 절편 제거를 의미(우리가 맞추고자 하는 선의 y 절편이 0이 됨을 의미)
- '-':삭제 기호. '-1'은 절편 제거를 의미('+0'과 결과가 같음)
- ':': 상호작용(수치형 값이나 이진화된 범주 값에 대한 곱셈)
- '.': 타깃/종속변수를 제외한 모든 컬럼
>>> from pyspark.ml.feature import RFormula
>>> supervised = RFormula(formula="lab ~ . + color:value1 + color:value2")
-> lab ~ .: ~의 왼쪽에는 종속변수, 오른쪽에는 독립변수가 위치 -> 따라서 lab을 종속 변수로 설정하고, 나머지를 모두 독립 변수로 설정
-> 종속변수 lab를 y, 독립변수 color, value1, 2라고 했을 때, 가중치 w0~4에 대해서
-> y(종속변수 lab) = w0 * a(color) + w1 * b(value1) + w2 * c(value2) + w3 * a * b(color:value1) + w4 * a * c(color:value2) 형태?로 이해함
+ 원래 범주형 변수였던 lab과 color도 RFormula를 거친 후에 숫자형 변수로 변형됨
https://spark.apache.org/docs/latest/ml-features
>>> fittedRF = supervised.fit(df)
>>> preparedDF = fittedRF.transform(df)
>>> preparedDF.show()
>>> preparedDF.select('features').show(truncate=False) #features 생략 없이 출력
-> fit 메서드를 호출함으로써, 어떤 컬럼이 범주형인지, 그리고 범주형 컬럼의 고윳값이 무엇인지 자동으로 결정
-> 실제로 데이터를 변형시키는 데 사용할 수 있는 '학습된' 버전의 변환자 반환
-> RFormula라는 지정된 수식에 따라 데이터를 변환할 객체를 출력 -> 이 변환자를 사용하면 스파크는 자동으로 범주형 변수를 Double 타입으로 변환
-> 예제에서는 color 컬럼의 각 범주에 숫자값 할당 -> color 와 value1, color와 value2 각 컬럼 간의 상호작용이 반영된 새로운 특징 생성
-> 학습된 features는 sparse vector 형태로 보임 -> (전체 벡터 size(10), 0이 아닌 값이 있는 index, 각 index에 들어갈 value)
-> 입력 데이터를 출력 데이터로 변환하기 위해 해당 객체에 대한 transform 호출
>>> train, test = preparedDF.randomSplit([0.7, 0.3])
-> 데이터를 train, test dataset으로 임의 분할
24.4.2 추정자
- 예제에서는 분류 알고리즘에 해당하는 로지스틱 회귀(Logistic regression)를 사용
- 분류기를 생성하기 위해서 기본 설정값 또는 하이퍼파라미터를 사용하여 로지스틱회귀 알고리즘을 객체화
- 이후 레이블 컬럼과 특징 컬럼을 설정
>>> from pyspark.ml.classification import LogisticRegression
>>> lr = LogisticRegression(labelCol = "label", featuresCol="features")
>>> print(lr.explainParams())
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bounds vector size must beequal with 1 for binomial regression, or the number oflasses for multinomial regression. (undefined)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter: max number of iterations (>= 0). (default: 100)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)
regParam: regularization parameter (>= 0). (default: 0.0)
standardization: whether to standardize the training features before fitting the model. (default: True)
threshold: Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p]. (default: 0.5)
thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0, excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class's threshold. (undefined)
tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)
upperBoundsOnCoefficients: The upper bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
upperBoundsOnIntercepts: The upper bounds on intercepts if fitting under bound constrained optimization. The bound vector size must be equal with 1 for binomial regression, or the number of classes for multinomial regression. (undefined)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)
-> lr의 모든 파라미터
- 지금까지 살펴본 변환자와 달리, 머신러닝 모델을 적합시키는 것은 사전학습 개념이며 즉시 수행될 수 있다.
>>> fittedLR.transform(train).select("label", "prediction").show()
-> fransform 메서드로 예측 결과 출력
하이퍼 파라미터: 모델 학습 프로세스에 영향을 주는 설정 매개변수. 학습을 시작되기 전에 설정
표준화(standardization): 평균을 기준으로 관측값들이 얼마나 떨어져 있는지 재표현
ex) 수치 데이터 표준화(z-transformation), 카테고리 데이터 표준화(원 핫 인코딩), 서수 데이터 표준화(value - 0.5 / max value)
정규화: 데이터 분포의 중심을 0으로 맞추고, 값의 분포가 특정 범위 안에 들어가도록 조정, 표준화 이후 모든 값을 0에서 1사이의 값으로 재표현
일반화: 모델 과적합을 방지하기 위한 기법으로, 모델의 표현식에 추가적인 제약 조건을 걸어 복잡도를 조정
24.4.3. 워크 플로를 파이프라인으로 만들기
파이프라인?
In machine learning, it is common to run a sequence of algorithms to process and learn from data.
- Split each document’s text into words.
- Convert each document’s words into a numerical feature vector.
- Learn a prediction model using the feature vectors and labels.
MLlib represents such a workflow as a Pipeline
(출처: https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline 스파크 공식 문서)
앞에서 배운 workflow 과정을 하나로 통합한 것이 Pipeline의 개념
- 변환자 객체나 모델 객체가 다른 파이프라인에서 재사용되지 않고, 다른 파이프라인을 생성하기 전에 항상 새로운 모델 객체를 만들어야 한다.
- 모델의 과적합 방지를 위해 holdout(testset)과 validationset을 기반으로 hyperparameter를 조정해야 한다.
>>> train, test = df.randomSplit([0.7, 0.3])
-> holdout testset 생성
>>> rForm = RFormula()
>>> lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")
>>> from pyspark.ml import Pipeline
>>> stages = [rForm, lr]
>>> pipeline = Pipeline().setStages(stages)
-> pipeline의 기본 단계인 두 개의 추정자 RFormula와 LogisticRegression 생성
-> 변환과 모델 튜닝을 수작업으로 하지 않고, 전체 파이프라인에서 단계로 생성
24.4.4. 모델 학습 및 평가
- 하나의 모델이 아닌 여러 개의 모델을 학습
- 스파크에서 테스트할 다양한 하이퍼파라미터의 조합을 지정해, 다양한 모델을 학습
- 이후 평가기(evaluator)를 사용하여 validationset으로 각 모델의 예측 결과를 비교한 후 최적의 모델 선택
- RFormula에서도 전체 파이프라인에 대한 다양한 하이퍼파라미터를 테스트할 수 있다.
>>> params = ParamGridBuilder()\
... .addGrid(rForm.formula, [
... "lab ~ . + color:value1",
... "lab ~ . + color:value1 + color:value2"])\
... .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
... .addGrid(lr.regParam, [0.1, 2.0])\
... .build()
-> 위 코드에서는 기본값을 변형한 다음 세 가지 hyperparameter 사용
- 두 개의 서로 다른 버전의 RFormular
- 세 개의 서로 다른 옵션의 ElasticNet 파라미터
- 두 개의 서로 다른 옵션의 일반화 파라미터
-> 파라미터 조합 12개로 12가지 버전의 logistic regression 학습을 진행
-> ElasticNet, 일반화 옵션은 26장에서 나옴
- BinaryClassificationEvaluator 사용
- 분류 성능을 측정하는 일반적 척도인 areadUnderROC -> 수신자 조작 특성(receiver operating characteristic)의 아래쪽 면적을 의미
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
>>> evaluator = BinaryClassificationEvaluator()\
... .setMetricName("areaUnderROC")\
... .setRawPredictionCol("prediction")\
... .setLabelCol("label")
- 모델의 과적합을 피하기 위해 validationset으로 hyperparamet를 적합시키는 것이 좋음
- 스파크는 hyperparameter tuning을 자동으로 수행하는 두 가지 옵션 제공
- TrainValidationSplit: 데이터를 두 개의 서로 다른 그룹으로 무작위 임의 분할할 때 사용
- CrossValidator: 데이터 집합을 겹치지 않게 임의로 구분된 k개의 폴드로 분할하여 K-fold cross-validation을 수행
>>> from pyspark.ml.tuning import TrainValidationSplit
>>> tvs = TrainValidationSplit()\
... .setTrainRatio(0.75)\
... .setEstimatorParamMaps(params)\
... .setEstimator(pipeline)\
... .setEvaluator(evaluator)
>>> tvsFitted = tvs.fit(train)
>>> evaluator.evaluate(tvsFitted.transform(test)) #0.8888888888888888
-> 지금까지 구축한 파이프라인을 구동
-> 해당 파이프라인을 실행하면 validationset에 대해 모든 버전의 모델이 테스트됨
-> testset에서 0.8888888888888888의 성능을 보임
24.4.5. 모델 저장 및 적용
>>> tvsFitted.write().overwrite().save("/Users/sangyeong_park/CE/KHUDA_5th/Data_Engineering/tmp/modelLocation")
-> 완성한 모델을 디스크에 저장
>>> from pyspark.ml.tuning import TrainValidationSplitModel
>>> model = TrainValidationSplitModel.load("/Users/sangyeong_park/CE/KHUDA_5th/Data_Engineering/tmp/modelLocation")
>>> model.transform(test)
-> 디스크에 저장된 모델을 load
-> 모델에 맞게 TrainValidationSplitModel을 사용
24.5. 모델 배포 방식
스파크에는 머신러닝 모델을 운영 환경에 적용하기 위한 다양한 배포 방식이 있다.
- 머신러닝 모델을 오프라인으로 학습 -> 오프라인 데이터에 적용. 여기서 말하는 오프라인 데이터는 신속하게 응답해야하는 데이터가 아니라, 분석을 위해 저장된 데이터를 의미
- 오프라인에서 모델을 학습시킨 다음 데이터베이스(키-값 저장소)에 결과 저장 -> 추천 분야에 적합,but 주어진 사용자에 대한 값을 바로 검색할 수 없고, 입력을 기반으로 값을 계산해야 하는 분류 혹은 회귀에는 적합X
- 머신러닝 알고리즘을 오프라인으로 학습시키고, 모델을 디스크에 저장한 다음 이를 사용해 서비스. 스파크를 서비스 부문에 사용한다면 스파크 잡을 시작하는 오버헤드가 클러스터에서 실행하지 않더라도 높을 수 있기 때문에 대기 시간이 길어질 수 있다. -> ???
- 단일 시스템상에서 사용자의 분산 모델을 훨씬 더 빠르게 수행할 수 있도록 수동 변환 -> 원시 데이터의 조작이 너무 많이 필요하지 않은 경우에는 잘 작동하나 시간이 지남에 따라 유지 보수가 매우 어려울 수 있음.
- 머신러닝 알고리즘을 온라인으로 학습시키고 온라인에서 사용. -> 구조적 스트리밍과 함께 사용할 때는 가능하지만, 일부 모델에서는 복잡해질 수 있음
Chapter 25. 데이터 전처리 및 피처 엔지니어링
- 25장에서는 스파크를 사용하여 데이터 전처리와 피처 엔지니어링을 수행하는 방법을 자세히 학습
- 데이터가 어떻게 구조화되어 있는가 하는 관점에서 MLlib 모델을 학습시키는 데 필요한 핵심 요구사항을 살펴보고, 이런 작업을 수행하기 위해 스파크가 제공하는 다양한 도구를 학습
25.1. 사용 목적에 따라 모델 서식 지정하기
다음은 MLlib의 각 고급 분석 작업을 위한 입력 데이터 구조 관련 요구사항이다.
- 대부분의 분류 및 회귀 알고리즘: 데이터를 double type의 column으로 가져와서 label을 표시하고, vector type(dense, parse)의 column을 사용하여 특징을 나타내야 함
- 추천 알고리즘: 데이터를 사용자 column, 영화 혹은 서적 등을 나타내는 아이템 colum, 그리고 사용자 평점 등을 나타내는 등급 column으로 표현해야 함
- 비지도 학습 알고리즘: 입력 데이터로 사용할 특징을 vector(dense, parse)의 column으로 표현
- 그래프 분석: vertex와 edge가 각각 DataFrame으로 구성되어야 한다.
- 이런 다양한 형태의 데이터를 확보하는 가장 좋은 방법은 변환자를 사용하는 것
>>> sales = spark.read.format("csv")\
... .option("header", "true")\
... .option("inferSchema", "true")\
... .load("/Users/sangyeong_park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")\
... .coalesce(5)\
... .where("Description IS NOT NULL")
>>> path = "/Users/sangyeong_park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data"
>>> fakeIntDF = spark.read.parquet(path + "/simple-ml-integers")
>>> simpleDF = spark.read.json(path + "/simple-ml")
>>> scaleDF = spark.read.parquet(path + "/simple-ml-scaling")
-> MLlib은 NULL 값이 존재하는 경우 동작하지 않는 경우가 많음
>>> sales.cache() #slaes dataset은 여러 번 액세스할 것이므로, 메모리에서 효율적으로 읽도록 캐싱
>>> sales.show()
25.2. 변환자
- 주로 데이터 전처리 또는 특징 생성을 위해 사용
- Tokenizer: 주어진 문자열을 토큰화(tokenization)하고, 주어진 문자로 분할
>>> from pyspark.ml.feature import Tokenizer
>>> tkn = Tokenizer()
>>> tkn.setInputCol("Description") #input column이름 지정
>>> tkn.transform(sales.select("Description")).show() #sales의 col중 description을 토큰화
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Tokenizer.html
25.3. 전처리 추정자
- 추정자는 수행하려는 변환이 input column에 대한 데이터 또는 정보로 초기화되어야 할 때 필요
- 추정자는 단순 변환을 위해 맹목적으로 적용되는 일반 변환자 유형과 데이터에 따라 변환을 수행하는 추정자 유형이 있다.
- 추정자 ex) StandardScaler: 입력 컬럼에 적재된 모든 값의 범위를 고려해 크기를 조정함으로써 각 차원에서 평균이 0이고 분산이 1이 되도록 스케일링 수행
>>> from pyspark.ml.feature import StandardScaler
>>> ss = StandardScaler()
>>> ss.setInputCol("features")
StandardScaler_169d59ddf34a
>>> ss.fit(scaleDF).transform(scaleDF).show(truncate = False)
-> features를 standardscale 적용한 결과
25.3.1. 변환자 속성 정의하기
- 모든 변환자는 적어도 입력과 출력 col의 이름을 나타내는 inputCol과 outputCol을 지정해야 한다.
- setInputCol, setOutputCol 사용
>>> ss.setOutputCol("output")
>>> ss.fit(scaleDF).transform(scaleDF).show(truncate = False)
25.4. 고수준 변환자
- RFormula 같은 고수준 변환자를 사용하면, 하나의 변환에서 여러 가지 변환을 간결하게 지정할 수 있다.
- 이들은 '상위 수준'에서 동작하며, 데이터 조작 및 변환을 하나하나 수행하지 않도록 해준다.
25.4.1. RFormula
- R 언어에서 빌려온 변환자로서, 데이터에 대한 변환을 선언적으로 간단히 지정할 수 있게 해준다.
- 이 변환자를 적용하면 데이터값은 숫자형 또는 범주형이 되고, 문자열에서 값을 추출하는 등의 조작할 필요가 없어진다.
- RFormula는 one-hot-encoding을 수행해 문자열로 지정된 범주화된 입력변수를 자동으로 처리한다. -> 순서의 의미가 없는 범주형 변수의 경우에는 String으로 형변환 해야 한다.
- 숫자 컬럼 -> double type으로 변환
- 레이블 컬럼이 string type인 경우, StringIndexer를 사용해서 Double type으로 변환
>>> from pyspark.ml.feature import RFormula
>>> supervised = RFormula(formula = "lab ~ . + color:value1 + color:value2")
>>> supervised.fit(simpleDF).transform(simpleDF).show()
-> 위에서와 같은 예제
-> 모든 변수(.)를 사용하고, value1, 2와 color 간의 상호작용을 지정하여 새로운 특징 생성
25.4.2. SQL 변환자
- SQLTransformer를 사용하면 MLlib 변환 기능을 사용할 때처럼 스파크의 방대한 SQL 데이터 처리 라이브러리를 활용할 수 있다.
- SQL의 SELECT문과 유일하게 다른 점은 테이블 이름 대신 THIS 키워드를 사용하는 것
- SQLTransformer를 사용해서 변환하는 경우, 출력 DataFrame에 새로운 column으로 추가됨
>>> from pyspark.ml.feature import SQLTransformer
>>> basicTransformation = SQLTransformer()\
... .setStatement("""
... SELECT sum(Quantity), count(*), CustomerID
... FROM __THIS__
... GROUP BY CustomerID
... """)
>>> basicTransformation.transform(sales).show()
25.4.3. 벡터 조합기
- VectorAssembler는 사용자가 생성하는 거의 모든 단일 pipeline에서 사용하게 될 도구이다.
- 모든 특징을 하나의 큰 벡터로 연결하여 추정자에 전달하는 기능 제공
- 일반적으로 머신러닝 pipeline의 마지막 단계에서 사용됨
>>> from pyspark.ml.feature import VectorAssembler
>>> va = VectorAssembler().setInputCols(["int1", "int2", "int3"])
>>> va.transform(fakeIntDF).show()
-> 1, 2, 3을 하나의 벡터로 변환
25.5. 연속형 특징 처리하기
- 연속형 특징 -> 양의 무한대 ~ 음의 무한대까지의 숫자값을 의미
- 연속형 특징을 처리하는 일반적인 두 개의 변환자 버켓팅, 스케일링과 정규화
- 버켓팅: 연속형 특징을 범주형 특징으로 변환할 수 있는 프로세스
>>> contDF = spark.range(20).selectExpr("cast(id as double)")
-> cast를 이용하여 id를 double type으로 변환
25.5.1. 버켓팅
버켓팅?
- 데이터에 hash 함수를 적용하여 고유한 index 부여 -> 일정 구간에 따라 연속형 데이터를 지정한 버켓으로 분할
- 버켓팅(bucketing) 혹은 구간화(binning)에 대한 가장 직접적인 접근법은 Bucketizer를 사용하는 것
- Bucketizer를 사용하면 주어진 연속형 특징을 지정한 버켓으로 분할
- 분할 배열의 최솟값은 DataFrame의 최솟값보다 작아야 한다.
- 분할 배열의 최댓값은 DataFrame의 최댓값보다 커야 한다.
- 분할 배열은 최소 세 개 이상의 값(범위의 시작, 경계, 범위의 끝)을 지정해서 두 개 이상의 버켓을 만들도록 해야 한다.
>>> from pyspark.ml.feature import Bucketizer
>>> bucketBorders = [-1.0, 5.0, 10.0, 250.0, 600.0]
>>> bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("id")
>>> bucketer.transform(contDF).show()
- QuantileDiscretizer: 근사화된 백분위수로 나누어진 사용자 지정 값으로 버켓팅
- setRelativeError: 근사치 계산에 대한 상대적 오류를 설정해서 버켓을 분할하는 방법을 제어
>>> from pyspark.ml.feature import QuantileDiscretizer
>>> bucketer = QuantileDiscretizer().setNumBuckets(5).setInputCol("id")\
... .setOutputCol("result")
>>> fittedBucketer = bucketer.fit(contDF)
>>> fittedBucketer.transform(contDF).show()
고급 버켓팅 기술
- LSH(locality sensitivityhashing) -> 해싱은 데이터의 무작위 액세스와 검색을 위해 사용되는 것에 비해, LSH에서는 비슷한 value를 동일하거나 유사한 해시 버킷으로 매핑하여, 유사성을 빠르게 탐지
25.5.3. StandardScaler
- StandardScaler: 특징들의 평균이 0, 표준편차가 1인 분포를 갖도록 데이터를 표준화
- withStd 플래그: 데이터를 표준편차 1이 되도록 스케일링
- withMean 플래그: 스케일링 전에 데이터를 센터링
>>> from pyspark.ml.feature import StandardScaler
>>> sScaler = StandardScaler().setInputCol("features")
>>> sScaler.fit(scaleDF).transform(scaleDF).show(truncate = False)
MinMaxScaler
- MinMaxScaler: 벡터의 값을 주어진 최솟값 ~ 최댓값 사이의 비례값으로 스케일링
- 최솟값을 0, 최댓값을 1로 지정하면 모든 값이 0에서 1사이가 된다.
>>> from pyspark.ml.feature import MinMaxScaler
>>> minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
>>> fittedminMax = minMax.fit(scaleDF)
>>> fittedminMax.transform(scaleDF).show()
MaxAbsScaler
- MaxAbsScaler: 각 값을 해당 컬럼의 최대 절댓값으로 나눠서 데이터의 범위를 조정
- 모든 값은 -1 ~ 1 사이
>>> from pyspark.ml.feature import MaxAbsScaler
>>> maScaler = MaxAbsScaler().setInputCol("features")
>>> fittedmaScaler = maScaler.fit(scaleDF)
>>> fittedmaScaler.transform(scaleDF).show(truncate = False)
ElementwiseProduct
- ElementwiseProduct: 벡터의 각 값을 임의의 값으로 조정 -> 행렬곱
- 1.0 * 10.0 = 10, 0.1 * 15.0 = 1.5, -1.0 * 20.0 = -20.0
>>> from pyspark.ml.feature import ElementwiseProduct
>>> from pyspark.ml.linalg import Vectors
>>> scaleUpVec = Vectors.dense(10.0, 15.0, 20.0)
>>> scalingUp = ElementwiseProduct()\
... .setScalingVec(scaleUpVec)\
... .setInputCol("features")
>>> scalingUp.transform(scaleDF).show()
Normalizer
- Normalizer: 여러 가지 표준 중 하나를 사용(parameter 'p'로 지정)하여 다차원 벡터를 스케일링
- 맨해튼 표준: p = 1, 유클리드 표준: p = 2
>>> from pyspark.ml.feature import Normalizer
>>> manhattanDistance = Normalizer().setP(1).setInputCol("features")
>>> manhattanDistance.transform(scaleDF).show(truncate = False)
25.6. 범주형 특징 처리하기
- 범주형 특징에 대한 가장 일반적인 작업은 인덱싱
- 인덱싱: 범주형 변수를 머신러닝 알고리즘에 적용할 수 있는 숫자형 변수로 변환
- 데이터 전처리를 할 때, 일관성을 위해 모든 범주형 변수의 Index를 다시 생성하는 것이 좋음
25.6.1. StringIndexer
- StringIndexer: DataFrame에 첨부된 메타데이터를 생성하여 어떤 입력이 어떤 출력에 해당하는지 지정
>>> from pyspark.ml.feature import StringIndexer
>>> lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
>>> idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
>>> idxRes.show()
- 문자열이 아닌 컬럼에도 StringIndexer를 적용할 수 있다.
- 이 경우 index 생성 전에 문자열로 변환
>>> valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
>>> valIndexer.fit(simpleDF).transform(simpleDF).show()
-> 학습 과정에서 관측되지 않았던 값인 경우 로우 전체를 스킵
25.6.2. 색인된 값을 텍스트로 변환하기
>>> from pyspark.ml.feature import IndexToString
>>> labelReverse = IndexToString().setInputCol("labelInd")
>>> labelReverse.transform(idxRes).show()
25.6.3. 벡터 인덱싱하기
- VectorIndexer: 입력 벡터 내에 존재하는 범주형 데이터를 자동으로 찾아서 0부터 시작하는 카테고리 index를 사용하여 범주형 특징으로 변환
>>> from pyspark.ml.feature import VectorIndexer
>>> from pyspark.ml.linalg import Vectors
>>> idxIn = spark.createDataFrame([
... (Vectors.dense(1, 2, 3), 1),
... (Vectors.dense(2, 5, 6), 2),
... (Vectors.dense(1, 8, 9), 3)
... ]).toDF("features", "label")
>>> indxr = VectorIndexer()\
... .setInputCol("features")\
... .setOutputCol("idxed")\
... .setMaxCategories(2)
>>> indxr.fit(idxIn).transform(idxIn).show()
-> 첫 번째 컬럼은 두 개의 다른 범주가 있는 범주형 변수, 나머지는 연속형
25.6.4. 원-핫 인코딩
- 데이터의 속성 관점에서 실제 의미가 없지만, 수치로 봤을 때 크기가 의미를 가질 수 있으므로, 각각의 고유한 값을 boolean flag type(1 또는 0)의 벡터 구성 요소로 변환
>>> from pyspark.ml.feature import OneHotEncoder, StringIndexer
>>> lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
>>> colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
>>> ohe = OneHotEncoder().setInputCol("colorInd")
>>> ohe.fit(colorLab).transform(colorLab).show()
-> 아마도 sparse vector 형태..?
25.7. 텍스트 데이터 변환자
- 일반적으로 접하는 텍스트 데이터는 자유형 텍스트와 문자열로 된 범주형 변수가 있다.
- 앞에서 범주형 변수를 다뤘으니, 자유형 텍스트 데이터 중점적으로 풀어나갈 예정
25.7. 텍스트 토큰화하기
- Tokenizer 클래스는 단어 컬럼을 공백으로 구분하여 단어 배열로 변환
>>> from pyspark.ml.feature import Tokenizer
>>> tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
>>> tokenized = tkn.transform(sales.select("Description"))
>>> tokenized.show(20, False)
- RegexTokenizer를 이용하면 정규 표현식을 이용한 Tokenizer를 만들 수 있다.
- Java의 정규 표현식 구문을 준수
>>> from pyspark.ml.feature import RegexTokenizer
>>> rt = RegexTokenizer()\
... .setInputCol("Description")\
... .setOutputCol("DescOut")\
... .setPattern(" ")\
... .setToLowercase(True)
>>> rt.transform(sales.select("Description")).show(20, False)
>>> from pyspark.ml.feature import RegexTokenizer
>>> rt = RegexTokenizer()\
... .setInputCol("Description")\
... .setOutputCol("DescOut")\
... .setPattern(" ")\
... .setGaps(False)\
... .setToLowercase(True)
>>> rt.transform(sales.select("Description")).show(20, False)
-> 이해가 잘 안 됨, 왜 2 3 행은 똑같이 공백이 둘인데 descout 결과가 다를까?
25.7.2. 일반적인 단어 제거하기
- 텍스트를 토큰화한 다음에는 일반적으로, 불용어나 분석과 관련 없는 무의미한 용어를 필터링한다.
>>> from pyspark.ml.feature import StopWordsRemover
>>> englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
>>> stops = StopWordsRemover()\
... .setStopWords(englishStopWords)\
... .setInputCol("DescOut")
>>> stops.transform(tokenized).show()
25.7.3. 단어 조합 만들기
- n-gram: 길이가 n인 sequence
- unigram: 길이가 1인 n-gram, bigram, trigram, ...
- 단어를 개별적으로 보는 것보다 더 나은 특징을 생성할 수 있다.
>>> from pyspark.ml.feature import NGram
>>> unigram = NGram().setInputCol("DescOut").setN(1)
>>> bigram = NGram().setInputCol("DescOut").setN(2)
>>> unigram.transform(tokenized.select("DescOut")).show(10, False)
>>> bigram.transform(tokenized.select("DescOut")).show(10, False)
25.7.4. 단어를 숫자로 변환하기
- CountVectorizer: 토큰화된 데이터에서만 작동하며, 다음 두 가지 작업을 수행
- 모델을 적합하는 프로세스 동안 모든 문서에서 단어 집합을 찾은 다음, 문서별로 해당 단어의 출현 빈도를 계산
- 그런 다음 변환 과정에서 DataFrame column의 각 row에서 주어진 단어의 발생 빈도를 계산하고, 해당 로우에 포함된 용어를 벡터 형태로 출력
- CountVectorizer는 모든 row를 문서(document) 취급하고, 모든 단어를 용어(term) 취급하며, 모든 용어의 집합을 어휘집(vocabulary)로 취급한다.
>>> from pyspark.ml.feature import CountVectorizer
>>> cv = CountVectorizer()\ #CountVectorizer는 기본적으로 document에 있는 term의 수를 출력
... .setInputCol("DescOut")\
... .setOutputCol("countVec")\
... .setVocabSize(500)\ #전체 최대 vocabulary 크기
... .setMinTF(1)\ #vocabulary에 포함될 term에 대한 최소 term의 빈도
... .setMinDF(2) #vocabulary에 포함되기 전에 term이 나타나야 하는 최소 document 빈도
>>> fittedCV = cv.fit(tokenized)
>>> fittedCV.transform(tokenized).show(10, False)
-> sparseVec: vocabulary의 크기, voca에 포함된 단어의 index, 특정 단어의 출현 빈도
TF-IDF
- TF-IDF(Term Frequency-Inverse Document Frequency): 얼마나 많은 document가 그 용어를 포함하고 있는지에 따라 가중치를 부여하면서, 특정 용어가 각 doc에 얼마나 자주 출현하는지 측정
- 적은 문서에서 출현하는 용어에 더 많은 가중치 부여
- 'the'와 같은 단어는 가중치가 매우 낮게 부여되지만, 'streaming'과 같은 단어는 더 적은 문서에서 발생하므로 상대적으로 더 높은 가중치 부여
>>> tfIdfIn = tokenized\
... .where("array_contains(DescOut, 'red')")\
... .select("DescOut")\
... .limit(10)
>>> tfIdfIn.show(10, False)
>>> from pyspark.ml.feature import HashingTF, IDF
>>> tf = HashingTF()\
... .setInputCol("DescOut")\
... .setOutputCol("TFOut")\
...
>>> tf = HashingTF()\
... .setInputCol("DescOut")\
... .setOutputCol("TFOut")\
... .setNumFeatures(10000)
>>> idf = IDF()\
... .setInputCol("TFOut")\
... .setOutputCol("IDFOut")\
... .setMinDocFreq(2)
>>> idf.fit(tf.transform(tfIdfIn)).transform(tf.transform(tfIdfIn)).show(10, False)
-> hashing은 CountVectorizer와 비슷한 과정이지만 결과를 되돌릴 수 없다.
-> sparseVec: voca 크기, documentd에 나타나는 모든 단어의 hash, 각 용어의 가중치
-> red는 모든 문서에서 나타나기 대문에 매우 낮은 가중치
25.7.5. Word2Vec
- Word2Vec: 비슷한 단어를 벡터 공간에서 서로 가깝게 배치하여 단어를 일반화
- skip grams라는 기술을 사용하여 단어의 문장을 벡터 표현으로 변환
- 이후 어휘를 작성하고 모든 문장에 대해 토큰을 제거하고 모델을 학습시켜 'n-gram' 표현에서 누락된 토큰을 예측
- word2vec은 토큰 형태이면서 연속적이고 자유형의 텍스트에서 가장 잘 작동한다.
>>> from pyspark.ml.feature import Word2Vec
>>> documentDF = spark.createDataFrame([
... ("Hi I heard about Spark".split(" "), ),
... ("I wish Java could use case classes".split(" "), ),
... ("Logistic regression models are neat".split(" "), )
... ], ["text"])
>>> word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text",
... outputCol="result")
>>> model = word2Vec.fit(documentDF)
>>> result = model.transform(documentDF)
>>> for row in result.collect():
... text, vector = row
... print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
25.8. 특징 조작하기
input feature space를 축소하거나 확장하는 알고리즘 및 도구 학습
25.8.1. 주성분 분석
- PCA(principal components analysis)는 기존 feature의 주요 정보를 최대한 남기면서 feature space를 축소한다.
>>> from pyspark.ml.feature import PCA
>>> pca = PCA().setInputCol("features").setK(2)
>>> pca.fit(scaleDF).transform(scaleDF).show(20, False)
-> 총 특징 수를 2로 축소
25.8.2. 상호작용
- 데이터 집합의 특정 변수에 대한 도메인 지식을 가지고, 특정 두 변수 간의 상호작용이 중요한 feature가 된다는 것을 미리 알고 있다면, 특징 변환자인 Interaction(스칼라)을 사용하여 수동으로 상호작용할 수 있다.
- 파이썬의 경우에는 RFormula를 사용
25.8.3. 다항식 전개
- 다항식 전개는 모든 입력 컬럼의 상호작용 변수를 생성하는 데 사용
- 자칫하면 과적합을 초래할 수 있으므로, 높은 차수의 다항식을 사용할 때는 주의
>>> from pyspark.ml.feature import PolynomialExpansion
>>> pe = PolynomialExpansion().setInputCol("features").setDegree(2)
>>> pe.transform(scaleDF).show(truncate = False)
25.9. 특징 선택
- 다양한 특징이 강한 상관관계를 나타내거나 너무 많은 특징을 모델 학습에 사용하면 모델 과적합 초래
- 특징 선택(feature selection): 다양한 후보 특징 중 일부만 선택하여 적용
25.9.1. ChiSqSelector
- ChiSqSelector는 통계적 검정을 활용하여, 예측하려는 레이블과 독립적이지 않은 특징을 식별하고 관련 없는 특징을 삭제
- 모델에 입력으로 사용되는 특징 수를 줄이거나, 텍스트 데이터의 차원ㅇ을 줄이기 위해 주로 범주 데이터와 함께 사용
- 이 방법은 카이제곱 검정(Chi-Square test)을 기반으로 하며, '최상'의 특징을 선택할 수 있는 여러 가지 방법이 있다.
- numTopFeatures 메서드는 p-value로 정렬된다 (카이제곱 검정에서 p-value가 작을수록 변수 간에 통계적으로 유의미한 관계가 있다는 것을 의미)
- percentile 메서드는 입력 특징의 비율 크기를 나타낸다
- fpr 메서드는 p-value를 어느 수준에서 판단해야 하는지 정한다.
>>> from pyspark.ml.feature import ChiSqSelector, Tokenizer
>>> tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
>>> tokenized = tkn\
... .transform(sales.select("Description", "CustomerId"))\
... .where("CustomerId IS NOT NULL")
>>> prechi = fittedCV.transform(tokenized)\
... .where("CustomerId IS NOT NULL")
>>> chisq = ChiSqSelector()\
... .setFeaturesCol("countVec")\
... .setLabelCol("CustomerId")\
... .setNumTopFeatures(2)
>>> chisq.fit(prechi).transform(prechi)\
... .drop("customerId", "Description", "DescOut").show()
25.10. 고급 주제
변환자 및 추정자와 관련한 고급 주제 중에 가장 보편적이고 지속해서 사용되는 두 가지 주제
- 변환자 저장하기
- 사용자 정의 변환자 작성하기
25.10.1. 변환자 저장하기
- 변환자를 디스크에 저장 및 불러오기하는 과정
>>> fittedPCA = pca.fit(scaleDF)
>>> fittedPCA.write().overwrite().save("/Users/sangyeong_park/CE/KHUDA_5th/Data_Engineering/tmp/fittedPCA")
>>>
>>> from pyspark.ml.feature import PCAModel
>>> loadedPCA = PCAModel.load("/Users/sangyeong_park/CE/KHUDA_5th/Data_Engineering/tmp/fittedPCA")
>>> loadedPCA.transform(scaleDF).show()
25.10.2. 사용자 정의 변환자 작성하기
https://medium.com/@rragundez/easily-build-custom-sparkml-transformers-and-estimators-16ba70414abe
import pyspark.sql.functions as F
from pyspark import keyword_only
from sparkml_base_classes import TransformerBaseClass
class AdditionColumnTransformer(TransformerBaseClass):
@keyword_only
def __init__(self, column_name=None, value=None):
super().__init__()
def _transform(self, ddf):
# add your transformation logic here
self._logger.info(f"Creating new column {self._column_name}")
ddf = ddf.withColumn(self._column_name, F.col(self._column_name) + self._value)
return ddf
-> 새로운 column을 추가하는 사용자 정의 변환자
참고자료
- "스파크 완벽 가이드" , 빌 체임버스 , 마테이 자하리아 저자(글) · 우성한 , 이영호 , 강재원 번역
- https://www.ibm.com/docs/ko/spss-statistics/29.0.0?topic=performance-roc-curve
'KHUDA 5th > Data Engineering' 카테고리의 다른 글
[Spark The Definitive Guide] Part 6-2. 고급 분석과 머신러닝(ch 29 ~ 31) (0) | 2024.04.11 |
---|---|
[Khuda 5th] Data Engineering 2주차 세션 (0) | 2024.03.25 |
[Spark The Definitive Guide] Part 2-2. 구조적 API: DataFrame, SQL, Dataset (1) | 2024.03.21 |
[Spark The Definitive Guide] Part 2-1. 구조적 API: DataFrame, SQL, Dataset (3) | 2024.03.18 |
[Spark The Definitive Guide] terminal pip3 설치 에러 (1) | 2024.03.08 |