250x250
Notice
Recent Posts
Recent Comments
«   2024/10   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
Archives
Today
Total
관리 메뉴

SYDev

[Spark The Definitive Guide] Part 2-2. 구조적 API: DataFrame, SQL, Dataset 본문

KHUDA 5th/Data Engineering

[Spark The Definitive Guide] Part 2-2. 구조적 API: DataFrame, SQL, Dataset

시데브 2024. 3. 21. 01:53
728x90
데이터 엔지니어링 심화트랙 2주차 정리 내용

 

 

Chapter 7. 집계 연산

  • 집계(aggregation)는 무언가를 함께 모으는 행위이며 빅데이터 분석의 초석이다.
  • 나 그룹을 지정하고 하나 이상의 컬럼을 변환하는 방법을 지정하는 집계 함수를 사용
  • 집계 함수는 여러 입력값이 주어지면 그룹별로 결과를 생성 
  • 지정된 집계 함수에 따라 그룹화된 결과는 RelationalGroupedDataset을 반환

-> 구매 이력 데이터를 사용해 파티션을 훨씬 적은 수로 분할할 수 있도록 리파티셔닝하고, 빠르게 접근할 수 있도록 캐싱

 

7.1. 집계 함수

  • 모든 집계는 사용한 DataFame의 .stat 속성을 이용하는 특별한 경우를 제외한다면 함수를 사용한다.

7.1.1. count

  • 다음 예제에서 count 함수는 액션이 아닌 트랜스포메이션으로 동작
  • count 함수에 특정 컬럼을 지정하는 방식 or count(*), count(1)을 사용하는 방식으로 두 가지가 있다.

 

7.1.2. countDistinct

  • 전체 레코드 수가 아닌 고유 레코드 수를 구하는 경우 countDistinct 함수를 사용

 

7.1.3. approx_count_distinct

  • 근사치 계산
  • 최대 추정 오류율이라는 파라미터가 추가로 있는데, 예제에서는 큰 오류율을 설정하여 기대치에서 크게 벗어나는 결과를 보임
  • 그러나 빠르게 결과를 반환
  • 대규모 데이터셋을 사용할 때 성능이 좋아짐

 

7.1.4. first와 last

  • dataframe의 첫 번째 값이나 마지막 값

 

7.1.5. min과 max

 

7.1. ~ 

  • sum: 특정 컬럼의 모든 값 합산
  • sumDistinct: 고윳값 합산
  • avg: 평균 계산
  • variance, stddev -> 표본표준분산, 표본표준편차
  • var_pop, stddev_pop -> 모표준분산, 모표준편차
  • skewness, kurtosis -> 비대칭도, 첨도
  • covariance, correlation -> 공분산(데이터 입력값에 따라 다른 범위를 가짐), 상관관계(피어슨 상관계수)
  • collect_set, collect_list -> 복합 데이터 타입의 집계: 특정 컬럼의 값을 셋 데이터 타입과 리스트로 고윳값만 수집

 

7.2. 그룹화

  • DataFrame 수준의 집계보다, 데이터 그룹 기반의 집계를 수행하는 경우가 더 많다.
  • 데이터 그룹 기반의 집계는 단일 컬럼의 데이터를 그룹화하고 해당 그룹의 다른 여러 칼럼을 사용해서 계산하기 위해 카테고리형 데이터를 사용한다.
  • 그룹화 작업은 하나 이상의 컬럼을 그룹화하고 집계 연산을 수행하는 두 단계로 이뤄진다.
  • 첫 단계에서는 RelationalGroupedDataset이 반환되고, 두 번째 단계에서는 DataFrame이 반환된다.

-> 그룹의 기준이 되는 컬럼을 여러 개 지정

 

7.2.1. 표현식을 이용한 그룹화

  • 메서드 대신 count 함수 사용하는 것이 좋음
  • count 함수를 select 구문에 표현식으로 지정하기보다 agg 메서드를 사용하는 것이 좋음 
  • agg 메서드는 여러 집계 처리를 한 번에 지정할 수 있으며, 집계에 표현식을 사용할 수 있다.
  • 트랜스포메이션이 완료된 컬럼에 alias 메서드를 사용할 수 있다.

 

7.2.2. 맵을 이용한 그룹화

  • 컬럼을 키로, 수행할 집계 함수의 문자열을 값으로 하는 map 타입을 사용해 트랜스포메이션을 정의
  • 수행할 집계 함수를 한 줄로 작성하면 여러 컬럼명을 재사용할 수 있다.

 

7.3. 윈도우 함수

  • 윈도우 함수: 윈도우 함수는 데이터의 특정 '윈도우'를 대상으로 고유의 집계 연산을 수행한다.
  • 데이터의 '윈도우'는 현재 데이터에 대한 참조를 사용해 정의
  • 윈도우 함수를 집계에 사용할 수도 있다.
  • Group-by 함수를 사용하면 모든 로우 레코드가 단일 그룹으로만 이동
  • 윈도우 함수는 frame에 입력되는 모든 로우에 대해 결괏값을 계산, frame은 로우 그룹 기반의 테이블을 의미
  • 각 로우는 하나 이상의 프레임에 할당될 수 있다.

스파크는 다음 세 가지 종류의 윈도우 함수를 지원한다.

  • 랭크 함수(ranking function)
  • 분석 함수(analytic function)
  • 집계 함수(aggregate function)

-> 예제를 위해 주문 일자(InvoiceDate) 컬럼을 변환해 'date' 컬럼을 생성, 이 컬럼은 시간 정보를 제외한 날짜 정보만 가진다.

-> 윈도우 함수를 정의하기 위해, 윈도우 명세를 만듦

-> partitionBy는 그룹을 어떻게 나눌지 결정하는 것과 유사한 개념

-> orderBy 메서드는 파티션 정렬 방식을 정의, desc는 내림차순

-> rowsBetween-> 입력된 로우의 참조를 기반으로 프레임에 로우가 포함될 수 있는지 결정

 

-> 시간대별 최대 구매 개수를 구하는 예시

이 함수를 적용할 프레임이 정의된 윈도우 명세도 함께 사용

 

-> dense_rank 함수를 사용해 모든 고객에 대해 최대 구매 수량을 가진 날짜가 언제인지 알아본다.

-> 중복 로우가 발생해 순위가 비어 있을 수 있으므로 rank 대신 dense_rank 함수 사용

 

-> 예제대로 dfWithDate를 설정하면 형식이 맞지 않아 오류가 발생하여, 새로 설정함

 참고: https://stackoverflow.com/questions/74984049/inconsistent-behavior-cross-version-parse-datetime-by-new-parser

 

INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER

I am very new to pyspark and getting below error, even if drop all date related columns or selecting only one column. Date format stored in my data frame like "". Can anyone please suggest

stackoverflow.com

 

7.4. 그룹화 셋

  • 그룹화 셋은 여러 집계를 결합하는 저수준 기능
  • 그룹화 셋을 이용하면 group-by 구문에서 원하는 형태로 집계를 생성할 수 있다.

 

7.4.1. 롤업

  • 롤업은 group-by 스타일의 다양한 연산을 수행할 수 있는 다차원 집계 기능

 

7.4.2. 큐브

  • 큐브는 롤업을 고차원적으로 사용할 수 있게 해준다.
  • 큐브는 요소들을 계층적으로 다루는 대신 모든 차원에 대해 동일한 작업을 수행한다.

 

 

7.4.3. 그룹화 메타데이터

  • grouping_id는 결과 데이터셋의 집꼐 수준을 며시하는 컬럼을 제공

 

7.4.4. 피벗

  • pivot을 사용해 로우를 컬럼으로 변환할 수 있다.

 

7.5. 사용자 정의 집계 함수

  • user-defined aggregation function, UDAF: 직접 제작한 함수나 비즈니스 규칙에 기반을 둔 자체 집계 함수를 정의하는 방법
  • 스파크는 입력 데이터의 모든 그룹의 중간 결과를 단일 AggregationBuffer에 저장해 관리한다.

UDAF를 생성하려면 기본 클래스인 UserDefinedAggregateFunctiond을 상속받고, 다음과 같은 메서드를 정의해야 한다.

 

-> UDAF는 스칼라와 자바만 사용 가능

 

Chapter 8. 조인

8.1. 조인 표현식

  • 스파크는 왼쪽과 오른쪽 데이터셋에 있는 하나 이상의 키값을 비교하고 왼쪽 데이터셋과 오른쪽 데이터셋의 결합 여부를 결정하는 조인 표현식의 평가 결과에 따라 두 개의 데이터셋을 조인합니다.
  • 가장 많이 사용하는 조인 표현식 -> 왼쪽과 오른쪽 데이터셋에 지정된 키가 동일한지 비교하는 동등 조인

 

8.2. 조인 타입

  • 내부 조인(inner join): 왼쪽과 오른쪽 데이터셋에 키가 있는 로우를 유지 
  • 외부 조인(outer join): 왼쪽이나 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 왼쪽 외부 조인(left outer join): 왼쪽 데이터셋에 키가 있는 로우를 유지
  • 오른쪽 외부 조인(right outer join): 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 왼쪽 세미 조인(left semi join): 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에는 키가 일치하는 왼쪽 데이터셋만 유지
  • 왼쪽 안티 조인(left anti join): 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않는 왼쪽 데이터셋만 유지
  • 자연 조인(natural join): 두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적(implicit)으로 결함하는 조인
  • 교차 조인(cross join) or 카테시안 조인(Cartesian join): 왼쪽 데이터셋의 모든 로우와 오른쪽 데이터셋의 모든 로우를 조합

 

>>> person = spark.createDataFrame([
... (0, "Bill Chambers", 0, [100]),
... (1, "MAtei Zaharia", 1, [500, 250, 100]),
... (2, "Michael Armbrust", 1, [250, 100])])\
... .toDF("id", "name", "graduate_program", "spark_status")
>>> graduateProgram = spark.createDataFrame([
... (0, "Masters", "School of Information", "UC Berkeley"),
... (2, "Masters", "EECS", "UC Berkeley"),
... (1, "PH.D.", "EECS", "UC Berkeley")])\
... .toDF("id", "degree", "department", "school")
>>> sparkStatus = spark.createDataFrame([
... (500, "Vice President"),
... (250, "PMC Member"),
... (100, "Contributor")])\
... .toDF("id", "status")
>>> person.createOrReplaceTempView("person")
>>> graduateProgram.createOrReplaceTempView("graduateProgram")
>>> sparkStatus.createOrReplaceTempView("sparkStatus")

-> 예제에서 사용하기 위해 테이블로 등록

 

8.3. 내부 조인

-> 내부 조인을 위한 표현식

 

>>> person.join(graduateProgram, joinExpression).show()

 

>>> joinType = "inner"
>>> person.join(graduateProgram, joinExpression, joinType).show()

-> join 메서드의 세 번째 파라미터로 조인 타입을 명확하게 지정할 수도 있다.

 

8.4. 외부 조인

>>> joinType = "outer"
>>> person.join(graduateProgram, joinExpression, joinType).show()

-> 표현식에 맞지 않는 로우도 표시, 없는 정보는 null 삽입

 

8.5. 왼쪽 외부 조인

>>> joinType = "left_outer"
>>> graduateProgram.join(person, joinExpression, joinType).show()

-> persond을 왼쪽에 배치하면, 로우가 3개만 포함됨

 

8.6. 오른쪽 외부 조인

>>> joinType = "right_outer"
>>> person.join(graduateProgram, joinExpression, joinType).show()

 

8.7. 왼쪽 세미 조인

>>> joinType = "left_semi"
>>> graduateProgram.join(person, joinExpression, joinType).show()

-> 기존 조인과 달리 두 번째 DataFrame은 그냥 값이 존재하는지 확인하기 위한 필터 역할

 

>>> gradProgram2 = graduateProgram.union(spark.createDataFrame([
... (0, "Masters", "Duplicated Row", "Duplicated School")]))
>>> gradProgram2.createOrReplaceTempView("gradProgram2")
>>> gradProgram2.join(person, joinExpression, joinType).show()

 

8.8. 왼쪽 안티 조인

>>> joinType = "left_anti"
>>> graduateProgram.join(person, joinExpression, joinType).show()

 

8.9. 자연 조인

  • 자연 조인은 조인하려는 컬럼을 암시적으로 추정

 

8.10. 교차 조인(카테시안 조인)

  • 조건절을 기술하지 않은 내부 조인
  • 왼쪽 dataframe과 오른쪽 dataframe을 합친 모든 경우의 수를 가진 dataframe 생성

-> 데이터 크기가 큰 경우 위험할 수 있기에 명시적으로 교차 조인을 정의해야 하며, 조심해서 써야 한다.

 

8.11. 조인 사용 시 문제점

8.11.1. 복합 데이터 타입의 조인

>>> person.withColumnRenamed("id", "personId")\
... .join(sparkStatus, expr("array_contains(spark_status, id)")).show()

-> 불리언을 반환하는 모든 표현식은 조인 표현식으로 간주 가능

 

8.11.2. 중복 컬럼명 처리

  • 조인에 사용할 dataframe의 특정 키가 동일한 이름을 가지며, 키가 제거되지 않도록 조인 표현식에 명시하는 경우
  • 조인 대상이 아닌 두 개의 컬럼이 동일한 이름을 가진 경우

-> 이 경우 문제 발생

 

해결 방법 1: 다른 조인 표현식 사용

  • 불리언 형태의 조인 표현식을 문자열이나 시쿼스 형태로 변경 -> 두 컬럼 중 하나가 자동으로 제거 -> ???

해결 방법 2: 조인 후 컬럼 제거

  • 원본 dataframe을 사용해 컬럼을 참조하여 문제가 되는 컬럼 제거

해결 방법 3: 조인 전 컬럼명 변경

 

8.12. 스파크의 조인 수행 방식

스파크가 조인을 수행하는 방식을 이해하기 위해서는 실행에 필요한 두 가지 핵심 전략을 이해해야 한다.

  • 노드간 네트워크 통신 전략
  • 노드별 연산 전략

8.12.1. 네트워크 통신 전략

  • 스파크는 조인 시에 셔플 조인과 브로드캐스트 조인 두 가지 클러스터 통신 방식을 활용한다.

큰 테이블과 큰 테이블 조인

  • 하나의 큰 테이블을 다른 큰 테이블과 조인하면 다음과 같이 셔플 조인이 발생

  • 셔플 조인은 전체 노드 간 통신이 발생한다. -> 조인에 사용한 트겆ㅇ 키나 키 집합을 어떤 노득 가졌는지에 따라 해당 노드와 데이터를 공유 -> 이런 통신 방식 때문에 네트워크는 복잡해지고 많은 자원을 사용

큰 테이블과 작은 테이블 조인

  • 테이블이 단일 워커 노드의 메모리 크기에 적합할 정도로 충분히 작은 경우 조인 연산을 최적화할 수 있다. -> 브로드캐스트 조인이 훨씬 효율적
  • 작은 dataframe을 클러스터의 전체 워커 노드에 복제 -> 시작 시 단 한 번만 복제가 수행되며, 그 이후로는 개별 워커가 다른 워커 노드를 기다리거나 통신할 필요 없이 작업을 수행할 수 있다.

  • 모든 단일 노드에서 개별적으로 조인이 수행되므로, CPU가 가장 큰 병목 구간이 된다.
  • 너무 큰 데이터를 브로드캐스트하면 고비용의 수집 연산이 발생하므로, 드라이버 노드가 비정상적으로 종료될 수 있다.

 

아주 작은 테이블 사이의 조인

  • 스파크가 조인 방식을 결정하도록 내버려두는 것이 가장 좋다.

 

Chapter 9. 데이터 소스

  • 스파크는 다양한 데이터소스를 읽고 쓰며, 데이터 소스를 커뮤니티에서 자체적으로 만들어낼 수 있다.
  • 이 장에서는 스파크의 핵심 데이터소스를 이용해 데이터를 읽고 쓰는 방법을 터득하고, 서드파티 데이터소스와 스파크를 연동할 때 무엇을 고려해야 하는지 배운다.

 

9.1. 데이터 소스 API의 구조

9.1.1. 읽기 API 구조

DataFrameReader.format(...).opiton("key", "value").schema(...).load()

 

  • 데이터 읽기의 핵심 구조는 위와 같다.
  • format 메서드는 선택적으로 사용
  • 기본값은 파케이 포맷
  • option 메서드를 사용해 데이터를 읽는 방법에 대한 파라미터를 키-값 쌍으로 설정 가능
  • schema 메서드는 데이터 소스에서 스키마를 제공하거나, 스키마 추론 기능을 사용하려는 경우에 선택적으로 사용

9.1.2. 데이터 읽기의 기초

spark.read
  • 스파크에서 데이터를 읽을 때는 기본적으로 DataFrameReader를 사용하는데, DataFrameReader에는 SparkSession의 read 속성으로 접근
  • DataFrameReader를 얻은 다음에는 포맷, 스키마, 읽기 모드, 옵션 값을 지정해야 한다.
  • 읽기 모드를 제외한 세 가지 항목은 필요한 경우에만 선택적으로 지정 가능
  • 사용자는 DataFrameReader에 반드시 데이터를 읽을 경로를 지정해야 한다.
spark.read.format("csv")
    .option("mode", "FAILFAST")
    .option("inferSchema", "true")
    .option("path", "path/to/file(s)")
    .schema(someSchema)
    .load()

 

읽기 모드

  • 외부 데이터 소스에서 데이터를 읽다 보면 자연스럽게 형식에 맞지 않는 데이터를 만나는데, 특히 반전형 데이터소스를 다룰 때 많이 발생 -> 이때 동작 방식을 지정하는 옵션이 읽기 모드

 

9.1.3. 쓰기 API 구조

  • partitionBy, bucketBy, sortBy 메서드는 파일 기반의 데이터소스에서만 동작

 

저장 모드

9.2. CSV 파일

  • CSV(comma-separated values)는 콤마(,)로 구분된 값을 의미한다.

9.2.2. CSV 파일 읽기

>>> spark.read.format("csv")
>>> myManualSchema = StructType([
... StructField("DEST_COUNTRY_NAME", StringType(), nullable=True),
... StructField("ORIGIN_COUNTRY_NAME", StringType(), nullable=True),
... StructField("count", LongType(), nullable=True)
... ])
>>> spark.read.format("csv")\
... .option("header", "true")\
... .option("mode", "FAILAST")\
... .schema(myManualSchema)\                                                  
... .load("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010-summary.csv")\
... .show(5)

-> 잘 동작

 

-> 모든 컬럼의 데이터 타입을 LongType으로 변경하여, 실제 스키마와 일치하지 않지만, 스파크는 문제를 찾지 못한다.

-> 스파크가 데이터를 읽어들이는 시점에 문제가 발생 -> 스파크는 지연 연산 특성이 있으므로 DataFrame 정의 시점이 아닌 잡 실행 시점에만 오류가 발생한다.

 

9.2.3. CSV 파일 쓰기

>>> csvFile = spark.read.format("csv")\
... .option("header", "true")\
... .option("mode", "FAILFAST")\
... .option("inferSchema", "true")\
... .load("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010-summary.csv")
>>> csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
... .save("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/tmp/my-tsv-file.tsv")

-> csv 파일을 읽어들여 tsv 파일로 내보내는 처리

 

 

9.3. JSON 파일

  • JSON(Javascript Object Notation): 자바스크립트 객체 표기법

9.3.2. JSON 파일 읽기

>>> spark.read.format("json").option("mode", "FAILFAST")\
... .option("inferSchema", "true")\
... .load("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/flight-data/json/2010-summary.json").show(5)

 

9.3.3. JSON 파일 쓰기

>>> csvFile.write.format("json").mode("overwrite").save("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/tmp/my-json-file.json")

 

9.4. 파케이 파일

  • 파케이는 다양한 스토리지 최적화 기술을 제공하는 오픈소스로 만들어진 컬럼 기반의 데이터 저장 방식이다.
  • 특히 분석 워크로드에 최적화되어 있다.
  • 저장소 공간을 절약할 수 있고 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며, 컬럼 기반의 압축 기느 제공
  • 아파치 스파크와 잘 호환되기 때문에 스파크의 기본 파일 포맷
  • 장기 저장용 데이터는 파케이 포맷으로 저장하자
  • 복합 데이터 타입을 지원 -> 컬럼이 배열, 맵, 구조체 데이터 타입이라 해도 문제 없이 읽고 쓸 수 있음
>>> spark.read.format("parquet")
<pyspark.sql.readwriter.DataFrameReader object at 0x1023cbb00>
>>> spark.read.format("parquet")\
... .load("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/flight-data/parquet/2010-summary.parquet").show(5)

 

-> 읽기

 

>>> csvFile.write.format("parquet").mode("overwrite")\
... .save("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/tmp/my-parquet-file.parquet")

-> 쓰기

 

9.5. ORC 파일

  • ORC는 하둡 워크로드를 위해 설계된 자기 기술적이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷이다.
  • 이 포맷은 대규모 스트리밍 읽기에 최적화되어 있을 뿐만 아니라, 필요한 로우를 신속히 찾아낼 수 있는 기능이 통합되어 있다.
  • 파케이는 스파크에 최적화 -> ORC는 하이브에 최적화
>>> spark.read.format("orc")\
... .load("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/flight-data/orc/2010-summary.orc").show(5)

 

-> 읽기, 쓰기도 지금까지 사용한 패턴 

 

9.6. SQL 데이터베이스

  • SQL 데이터 소스는 매우 강력한 커넥터 중 하나
  • 사용자는 SQL을 지원하는 다양한 시스템에 SQL 데이터소스를 연결할 수 있다.
  • 데이터베이스는 원시 파일 형태가 아니므로 고려해야 할 옵션이 더 많다.
  • 그런 번거로움을 없애고자 이 책의 예제에서는 SQLite 실행을 위한 참고용 샘플을 제공
  • 모든 예제는 분산 환경이 아닌 로컬 머신에서도 충분히 테스트 가능해야함
SQLite는 파일에 데이터를 저장하는 데이터베이스로서 강력하고 빠르며 이해하기 쉽다. 
다른 데이터베이스와 가장 큰 차이점은 데이터베이스 접속 시 사용자 인증이 필요 없다.

 

wget https://github.com/xerial/sqlite-jdbc/releases/download/3.36.0.3/sqlite-jdbc-3.36.0.3.jar
cp sqlite-jdbc-3.36.0.3.jar /spark-2.3.2-bin-hadoop2.7/jars
./bin/pyspark \
> --driver-class-path sqlite-jdbc-3.36.0.3.jar \
> --jars sqlite-jdbc-3.36.0.3.jar

 

10. 스파크 SQL

  • 스파크 SQL을 사용해 DB에 생성된 view나 table에 SQL 쿼리를 실행할 수 있다.
  • 시스템 함수 사용, 사용자 정의 함수 정의 가능
  • 워크로드를 최적화하기 위해 쿼리 실행 계획을 분석할 수 있다.

10.1. SQL이란

  • SQL(Structured Query Language): 데이터에 대한 관계형 연산을 표현하기 위한 도메인 특화 언어
  • 스파크는 ANSI SQL:2003의 일부를 구현

10.4. 스파크 SQL 쿼리 실행 방법

10.4.1. 스파크 SQL CLI

  • 스파크 SQL CLI(Control Line Interface): 로컬 환경의 명령행에서 기본 스파크 SQL 쿼리를 실행할 수 있는 도구
>>> spark.sql("SELECT 1 + 1").show()

 

 

>>> spark.sql("""SELECT user_id, department, first_name FROM professors WHERE department IN (SELECT name FROM department WHERE created_date >= '2016-01-01')""")

 

>>> spark.read.json("/Users/sangyeong-park/CE/KHUDA_5th/Data_Engineering/Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")\
... .createOrReplaceTempView("some_sql_view")
>>> spark.sql("""
... SELECT DEST_COUNTRY_NAME, sum(count)
... FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
... """)\
... .where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")\
... .count()

 

-> DataFrame을 SQL에서 사용할 수 있도록 처리한 후, SQL의 결과를 DataFrame으로 반환

 

10.4.3. 스파크 SQL 쓰리프트 JDBC/ODBC 서버

  • 스파크는 자바 데이터베이스 연결(JDBC) 인터페이스를 제공한다.
  • 사용자나 원격 프로그램은 스파크 SQL을 실행하기 위해 이 인터페이스로 스파크 드라이버에 접속한다.
  • 쓰리프트 JDBC/ODBC(Open Database Connectivity) 서버는 하이브 1.2.1 버전의 HiveServer2에 맞춰 구현되어 있다.

 

 


참고자료

  • "스파크 완벽 가이드" , 빌 체임버스 , 마테이 자하리아 저자(글) · 우성한 , 이영호 , 강재원 번역

 

728x90