스파크 완벽 가이드 - 구조적 API 기본 연산

이 글은 “스파크 완벽 가이드” 책 내용을 정리한 글입니다.

저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.

Chapter 10. Spark SQL

  • Spark SQL을 사용해 데이터베이스에 생성된 뷰나 테이블에 SQL 쿼리를 실행할 수 있다.
  • 스파크 SQL은 DataFrame과 Dataset API에 통합되어 있다. 따라서 데이터 변환 시 SQL과 DataFrame의 기능을 모두 사용할 수 있으며 두 방식 모두 동일한 실행 코드로 컴파일 된다.

1. SQL이란

  • SQL 또는 구조적 질의 언어는 데이터에 대한 관계형 연산을 표현하기 위한 도메인 특화 언어이다.
  • 스파크는 ANSI SQL:2003의 일부를 구현했다.

2. 빅데이터와 SQL: 아파치 하이브

  • 스파크가 등장하기 전에는 하이브가 빅데이터 SQL 접근 계층에서 사실상의 표준이었다.
  • 스파크는 RDD를 이용하는 범용 처리 엔진으로 시작했지만 이제는 많은 사용자가 스파크 SQL을 사용하고 있다.

3. 빅데이터와 SQL : 스파크 SQL

  • 스파크 SQL은 DataFrame과의 뛰어난 호환성을 가진다.
  • 스파크 SQL은 OLTP 데이터베이스가 아닌 OLAP 데이터베이스로 동작한다.

3.1 스파크와 하이브의 관계

  • 스파크 SQL은 하이브 메타스토어를 사용하므로 하이브와 잘 연동할 수 있다.
    • 하이브 메타스토어는 여러 세션에서 사용할 테이블 정보를 보관하고 있다.

4. 스파크 SQL 쿼리 실행 방법

4.1 스파크 SQL CLI

  • 로컬 환경의 명령행에서 기본 스파크 SQL 쿼리를 실행할 수 있는 편리한 도구이다.

4.2 스파크의 프로그래밍 SQL 인터페이스

  • 스파크에서 지원하는 언어 API로 비정형 SQL을 실행할 수도 있다.
  • SparkSession 객체의 sql 메서드를 사용한다.
  • 처리된 결과는 DataFrame를 리턴한다.
spark.sql("SELECT 1 + 1").show()
  • 다른 트랜스포메이션과 마찬가지로 즉시 실행되지 않고 지연 처리된다.
  • SQL과 DataFrame은 완벽하게 연동될 수 있으므로 더 강력하다. 예를 들어 DataFrame을 생성하고 SQL을 사용해 처리할 수 있으며 그 결과를 다시 DataFrame으로 돌려받게 된다.
// DataFrame을 SQL에서 사용할 수 있도록 처리
spark.read.json("/path").createOrReplaceTempView("some_sql_view") 

// SQL의 결과를 DataFrame으로 반환
spark.sql("""
	SELECT DEST_COUNTRY_NAME, sum(count)
	FROM some_sql_view GROUP BY DESC_COUNTRY_NAME
""")
	.where("DESC_COUNTRY_NAME like '%s'").where("`sum(count)` > 10")
	.count()

5. 카탈로그

  • 스파크 SQL에서 가장 높은 추상화 단계
  • 카탈로그는 테이블에 저장된 데이터에 대한 메타데이버뿐만 아니라 데이터베이스, 테이블, 함수 그리고 뷰에 대한 정보를 추상화한다.
  • 카탈로그는 테이블, 데이터베이스 그리고 함수를 조회하는 등 여러 가지 유용한 함수를 제공한다.

6. 테이블

  • 스파크 SQL을 사용해 유용한 작업을 수행하려면 먼저 테이블을 정의해야 한다.
  • 테이블은 명령을 실행할 데이터의 구조라는 점에서 DataFrame과 논리적으로 동일하다.
  • DataFrame은 프로그래밍 언어로 정의하지만 테이블은 데이터베이스에서 정의한다. 스파크에서 테이블을 생성하면 default 데이터베이스에 등록된다.
  • 테이블은 항상 데이터를 가지고 있다.

6.1 스파크 관리형 테이블

  • 테이블은 두 가지 중요한 정보를 저장한다.
    • 테이블의 데이터
    • 테이블에 대한 데이터 (메타 데이터)
  • DataFrame의 saveAsTable 메서드는 스파크가 관련된 모든 정보를 추적할 수 있는 관리형 테이블을 만들 수 있다.
  • saveAsTable 메서드는 테이블을 읽고 데이터를 스파크 포맷으로 변환한 후 새로운 경로에 저장한다.

6.2 테이블 생성하기

  • 다양한 데이터소스를 사용해 테이블을 생성할 수 있다.

6.3 외부 테이블

  • 대부분의 하이브 쿼리문을 스파크 SQL에서 바로 사용할 수 있다.
  • 스파크는 외부 테이블의 메타데이터를 관리한다.하지만 데이터 파일은 스파크에서 관리하지 않느다.

6.7 테이블 제거하기

  • 테이블은 삭제할 수 없다. 오로지 제거만 할 수 있다.
  • DROP 키워드를 사용해 테이블을 제거한다.
  • 테이블을 제거하면 테이블의 데이터가 모두 삭제되므로 매우 신중하게 작업해야 한다.
DROP TABLE flights_csv;

DROP TABLE IF EXISTS flights_csv;

7. 뷰

  • 뷰는 단순 쿼리 실행 계획일 뿐이다.
  • 뷰를 사용하면 쿼리 로직을 체계화하거나 재사용하기 편하게 만들 수 있다.
  • 뷰는 데이터베이스에 설정하는 전역 뷰나 세션별 뷰가 될 수 있다.

7.1 뷰 생성하기

  • 최종 사용자에게 뷰는 테이블처럼 보인다.
  • 신규 경로에 모든 데이터를 다시 저장하는 대신 단순하게 쿼리 시점에 데이터소스에 트랜스포메이션을 수행한다.
CREATE VIEW just_usa_view AS
SELECT * FROM flights WHERE dest_country_name = 'United State'
  • 테이블처럼 데이터베이스에 등록되지 않고 현재 세션에서만 사용할 수 있는 임시 뷰를 만들 수 있다.
CREATE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United State'
  • 전역적 임시 뷰도 만들 수 있다. 전역적 임시 뷰 (global temp view)는 데이터베이스에 상관없이 사용할 수 있으므로 전체 스파크 애플케이션에서 볼 수 있다. 하지만 세션이 종료되면 뷰도 사라진다.
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United State'

SHOW TABLES
  • 뷰는 실직적으로 트랜스포메이션이며 스파크는 쿼리가 실행될 때만 뷰에 정의된 트랜스포메이션을 수행한다. 즉 테이블의 데이터를 실제로 조회하는 경우에만 필터를 적용한다. 사실 뷰는 기존 DataFrame에서 새로운 DataFrame을 만드는 것과 동일하다.

7.2 뷰 제거하기

  • 뷰와 테이블 제거의 핵심 차이점은 뷰는 어떤 데이터도 제거되지 않으며 뷰 정의만 제거된다는 것이다.
DROP VIEW IF EXIST just_usa_view;

8. 데이터베이스

  • 데이터베이스는 여러 테이블을 조직화하기 위한 도구이다.
  • 스파크는 데이터베이스를 정의하지 않으면 기본 데이터베이스를 사용한다.
  • 스파크에서 실행하는 모든 SQL 명령문은 사용 중인 데이터베이스 범위에서 실행된다.
/* 전체 데이터베이스 목록 조회 */
SHOW DATABASES

8.1 데이터베이스 생성/사용/삭제

CREATE DATABASE some_db

/* 쿼리 수행에 필요한 데이터베이스를 설정 */
USE some_db
  • 모든 쿼리를 테이블 이름을 찾을 때 앞서 지정한 데이터베이스를 참조한다.
/* 접두사를 사용해 다른 데이터베이스의 테이블에 쿼리할 수 있다. */
SELECT * FROM default.flights

/* 어떤 데이터베이스를 사용 중인지 확인 가능 */
SELECT current_database()
/* 데이터베이스 삭제 */
DROP TABLE IF EXISTS some_db;

9. SELECT 구문

  • 스파크 쿼리는 ANSI-SQL 요건을 충족한다.
SELECT 
	CASE 	WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
		  	WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
		  	ELSE -1 END
  FROM partitioned_flights

10 고급 주제

  • SQL 쿼리는 특정 명령 집합을 실행하도록 요청하는 SQL 구문이다.
  • SQL 구문은 조작, 정의, 제어와 관련된 명령을 정의할 수 있다.

10.1 복합 데이터 타입

  • 표준 SQL에는 존재하지 않는 매우 강력한 기능
  • 구조체, 리스트, 맵 세가지 핵심 복합 데이터 타입이 존재함

구조체

  • 구조체는 맵에 더 가까우며 스파크에서 중첩 데이터를 생성하거나 쿼리하는 방법을 제공
  • 구조체를 만들기 위해서는 여러 컬럼이나 표현식을 괄호로 묶기만 하면 된다.
CREATE VIEW IF NOT EXIST nested_data AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights

SELECT country.DEST_COUNTRY_NAME from nested_data
SELECT country.*, count from nested_data

리스트

  • collect_list 함수나 collect_set 함수를 통해 리스트나 배열을 생성할 수 있다.
  • 두 함수는 모두 집계 함수이므로 집계 연산 시에만 사용이 가능
SELECT COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME

/* 직접 배열 생성 가능 */
SELECT DEST_COUNTRY_NAME, ARRAY (1, 2, 3) FROM flights

/* 배열 쿼리 구문을 사용해 리스트의 특정 위치의 데이터를 쿼리할 수 있다. */
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME
  • explode 함수를 사용해 배열을 다시 여러 로우로 변환할 수 있다.
  • explode 함수는 collect 함수와는 정확히 반대로 동작한다.
CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
FROM flights GROUP BY DEST_COUNTRY_NAME

select explode(collect_counts), DEST_COUNTRY_NAME FROM flights_agg

10.2 함수

  • 다양한 고급 함수를 제공한다.
/* 스파크 SQL이 제공하는 전체 함수 목록을 확인 */
SHOW FUNCTIONS

/* 스파크에 내장된 시스템 함수 목록 확인 */
SHOW SYSTEM FUNCTIONS

/* 스파크에 내장된 사용자 함수 목록 확인 */
SHOW USER FUNCTIONS

사용자 정의 함수

  • 특정 언어를 사용해 함수를 개발한 다음 등록하여 함수를 정의할 수 있다.
def power3(number: Double): Double = number * number * number
  
spark.udf.register("power3", power3(_: Double): Double)

SELECT count, power3(count) FROM flights

10.3 서브쿼리

  • 스파크에는 두가지 기본 서브 쿼리가 있다.
    • 상호 연관 서브쿼리 : 서브 쿼리의 정보를 보완하기 위해 쿼리의 외부 범위에 있는 일부 정보를 사용
    • 비상호연관 서브쿼리 : 외부 범위에 있는 정보를 사용하지 않음
  • 스파크는 조건절 서브쿼리도 지원한다.

비상호연관 조건절 서브쿼리

SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)

상호연관 조건절 서브쿼리

SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2 WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2 WHERE f1.origin_country_name = f2.dest_country_name)
  • EXISTS 키워드는 서브쿼리에 값이 존재하면 true를 반환한다.

비상호연관 스칼라 쿼리

SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights

11. 다양한 기능

11.1 설정

  • spark.sql.shuffle.partitions (200) : 조인이나 집계 수행에 필요한 데이터를 셔플링할 때 사용할 파티션 수
  • spark.sql.autoBroadcastJoinThreshold (10MB) : 조인 시 전체 워커 노드로 전파될 테이블의 최대 크기를 바이트 단위로 설정한다. 이 값을 -1로 설정하면 브로드케이스 조인을 비활성화 한다.
  • spark.sql.broadcastTimeout (300) : 브로드캐스트 조인 시 전체 워커 노드로 전파할 떄 기다릴 최대 시간을 초 단위로 설정