스파크 완벽 가이드 - Chapter5. 구조적 API 기본 연산
by Gunju Ko
스파크 완벽 가이드 - 구조적 API 기본 연산
이 글은 “스파크 완벽 가이드” 책 내용을 정리한 글입니다.
저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.
- DataFrame은 Row 타입의 레코드와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성된다.
- 스키마는 각 컬럼명과 데이터 타입을 정의한다.
- DataFrame의 파티셔닝은 DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의한다.
- 파티셔닝 스키마는 파티션을 배치하는 방법을 정의한다.
- 파티셔닝 분할 기준은 특정 컬럼이나 비결정론전(nondeterministically) 값을 기반으로 설정할 수 있다.
val df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.printSchema()
1. 스키마
- 스키마는 DataFrame의 컬럼명과 데이터 타입을 정의한다.
스카마를 정의해야 하는가?
- 상황에 따라 다르다. 비정형 분석에서는 스키마-온-리드가 대부분 잘 동작한다.
- Long 데이터 타입을 Integer 데이터 타입으로 잘못 인식하는 등 정밀도 문제가 발생할 수 있다.
- 운영환경에서 ETL 작업에 스파크를 사용한다면 직접 스키마를 정의하는게 좋다.
- 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체이다.
- 스키마는 복합 데이터 타입인 StructType을 가질 수 있다.
- StructField는 이름, 데이터 타입, 컬럼이 값이 없거나 null일 수 있는지 지정하는 불리언값을 가진다.
- 필요한 경우 컬럼과 관련된 메타데이터를 지정할 수도 있다.
- 스키마는 런타임에 데이터 타입이 스카마의 데이터 타입과 일치하지 않으면 오류를 발생시킨다.
val manualSchema = StructType(Array(
StructField("DEST_CONTRY_NAME", StringType, true),
StructField("ORIGIN_CONTRY_NAME", StringType, true),
StructField("count", LongType, false, Metadata.fromJson("{\"hello\":\"world\"}")),
))
val df = spark.read.format("json").schema(manualSchema)
.load("/data/flight-data/json/2015-summary.json")
- 스파크는 자체 데이터 타입 정보를 사용하므로 프로그래밍 언어의 데이터 타입을 스파크의 데이터 타입으로 설정할 수 없다.
2. 컬럼과 표현식
- 사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있다.
- 컬럼은 DataFrame을 통해서 접근할 수 있다.
- 컬럼의 내용을 수정하려면 DataFrame의 트랜스포메이션을 사용해야 한다.
2.1 컬럼
- 컬럼을 생성하고 참조할 수 있는 가장 간단한 방법은
col
함수나column
함수를 사용하는 것이다.
col("someColumnName")
column("someColumnName")
- 컬럼이 DataFrame에 없을지 있을지는 알 수 없으며 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태로 남는다.
스칼라는 고유 기능을 사용해 다음과 같은 방법으로 컬럼을 참조할 수 있다.
- $”myColumn”
- `myColumn`
- DataFrame의 컬럼은 col 메서드로 참조한다.
2.2 표현식
- 표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미한다.
- 여러 컬럼명을 입력으로 받아 식별하고, ‘단일 값’을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수라고 생각할 수 있다.
- 표현식은 expr 함수로 가장 간단히 사용할 수 있다.
- DataFrame의 컬럼을 참조할 수도 있다.
- 컬럼은 단지 표현식일 뿐이다.
- 컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행 계획으로 컴파일 된다.
(((colr("someCol") + 5) * 200) - 6) < col("otherCol")
expr("(((someCol + 5) + 200) - 6) < otherCol")
- 위 코드는 실행 시점에 동일한 논리 트리로 컴파일된다.
-
SQL의 SELECT 구문에 위 표현식을 마찬가지로 동일한 논리 트리로 컴파일된다.
- 아래와 같은 방식으로 컬럼에 접근할 수 있다.
val df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
.columns
3. 레코드와 로우
- DataFrame의 각 로우는 하나의 레코드이다.
- 스파크는 레코드를 Row 객체로 표현한다.
- Row 객체는 내부에 바이트배열을 가지며 이 바이트 배열 인터페이스는 오직 컬럼 표현식으로만 다룰 수 있다.
3.1 로우 생성하기
- Row 객체는 스키마 정보를 가지고 있지 않고 DataFrame만 유일하게 스키마를 갖는다.
- Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 한다.
val myRow = Row("Hello", null, 1, false)
myRow(0).asInstanceOf[String] // String 타입
myRow.getString(0) // String 타입
myRow.getInt(2) // Int 타입
4. DataFrame의 트랜스포메이션
- DataFrame을 다루는 방법은 몇가지 주요 작업을 나눌 수 있다.
- 로우나 컬럼 추가
- 로우나 컬럼 제거
- 로우를 컬럼으로 변환하고나, 그 반대로 변환
- 컬럼값을 기준으로 로우 순서 변경
4.1 DataFrame 생성하기
// 데이터소스에서 DataFrame 생성하기
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
// 임시뷰로 등록
df.createOrReplaceTempView("dfTable")
// Row 객체를 가진 Seq 타입을 직접 변환해 DataFrame을 생성
val manualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false),
))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRdd, manualSchema)
4.2 select와 selectExpr
- select와 selectExpr 메서드를 사용하면 데이터 테이블에 SQL을 실행하는것처럼 DataFrame에서도 SQL을 사용할 수 있다.
SELCT * FROM dataFrameTable
// 컬럼명을 인수로 받는 select 메서드
df.select("DESC_COUNTRY_NAME").show(2)
--- SQL
SELECT DESC_COUNTRY_NAME FROM dfTable LIMIT 2
// 여러 컬럼을 선택 가능
df.select("DESC_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
--- SQL
SELECT DESC_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2
- 컬럼을 참조하는 다양한 방법을 섞어가면서 사용할 수 있다.
df.select(
df.col("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"),
'DEST_COUNTRY_NAME',
$"DEST_COUNTRY_NAME",
expr("DEST_COUNTRY_NAME")
).show(2)
- Column 객체와 문자열을 함께 섞어 쓰는 경우엔 컴파일 오류가 발생한다.
// compile error
df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")
- expr 함수는 가장 유연한 참조 방법이다. 아래 코드는 컬러명을 destination으로 변경한다.
// 스칼라 코드
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
- selectExpr 메서드는 select 메서드에 expr 함수를 사용하는것과 같은 기능을 한다.
df.selectExpr("DEST_COUNTRY_NAME AS destination").show(2)
- selectExpr 메서드는 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구이다. 사실 모든 유효현 비집계형 SQL 구문을 지정할 수 있다.
df.selectExpr(
"*",
"(DESC_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry"
).show(2)
4.3 스파크 데이터 타입으로 변환하기
- 명시적인 값을 스파크에 전달해야 할 경우엔 리터럴을 사용한다.
df.select(expr(*), lit(1).as("One")).show(2)
- SQL에서 리터럴은 상수값을 의미한다.
SELECT *, 1 as One FROM dfTable LIMIT 2
4.4 컬럼 추가하기
- withColumn 메서드를 사용하면 컬럼을 추가 할 수 있다.
df.withColumn("numberOne", lit(1)).show(2)
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
4.5 컬럼명 변경하기
- withColumnRenamed 메서드로 컬럼명을 변경할 수 있다.
df.withColumnRenamed("DEST_COUNTRY_NAME", "desc").columns
4.6 예약 문자와 키워드
- 공백이나 하이픈(-) 같은 예약 문자는 컬럼명에 사용할 수 없다.
- 예약 문자를 컬럼명에 사용하려면 백틱(`) 문자를 이용해 이스케이핑 해야한다.
4.7 대소문자 구분
- 대소문자를 구분하지 않는다.
set spark.sql.caseSensitive true
4.8 컬럼 제거하기
- drop 메서드를 사용
df.drop("ORIGIN_COUNTRY_NAME")
4.9 컬럼 데이터 타입 변경
- cast 메서드를 사용
4.10 로우 필터링
- where 메서드나 filter 메서드로 필터링 할 수 었음
df.filter(col("count") < 2).show(2)
df.where(col("count") < 2).show(2)
--- SQL
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
- 같은 표현식에 여러 필터를 적용해야할 때도 있다. 하지만 스파크는 자동으로 필터의 순서와 상관없이 동시에 모든 필터링 작업을 수행하기 때문에 항상 유용한 것은 아니다.
4.11 고유한 로우 얻기
- distinct 메서드를 사용해서 고윳값을 찾을 수 있다.
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
4.12 무작위 샘플 만들기
- sample 메서드를 사용하면 DataFrame에서 무작위 데이터를 얻을 수 있다.
- 표본 데이터 추출 비율을 지정할 수 있다.
- 복원 추출이나 비복원 추출의 사용 여부를 지정할 수도 있다.
4.12 임의 분할하기
- randomSplit은 DataFrame을 임의 크기로 분할할 때 유용하게 사용된다.
- 머신러닝 알고리즘에서 사용할 학습셋, 검증셋, 그리고 테스트셋을 만들 때 주로 사용한다.
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count()
- 시드값을 반드시 설정해야한다.
4.13 로우 합치기와 추가하기
- DataFrame은 불변성을 가지므로 레코드를 추가하는 작업은 불가능하다.
- DataFrame에 레코드를 추가하려면 원본 DataFrame을 새로운 DataFrame과 통합해야 한다.
- 통합하려는 두 개의 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야한다.
val schema = df.schema
val newRows = Seq(
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L),
)
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDf = spark.createDataFrame(parallelizedRows, schema)
df.union(newDf)
.where("count = 1")
.show()
- 스칼라에서는 반드시
=!=
연산자를 사용해야한다. 문자열을 비교할 때=!=
연산자를 사용하면 컬럼의 실제값을 비교 대상 문자열과 비교한다. - 로우가 추가된 DataFrame을 참조하려면 새롭게 만들어진 DataFrame 객체를 사용해야 한다.
4.14 로우 정렬하기
- sortBy, orderBy 메서드를 사용해 정렬할 수 있다. 두 메서드는 같은 방식으로 동작한다.
- 기본 동작은 오름차순 정렬이다.
df.sort("count").show(5)
df.orderBy("count").show(5)
df.sort(desc("count")).show(5)
df.orderBy(asc("count")).show(5)
- 정렬 기준을 명확히하려면 asc나 desc 함수를 사용한다.
- acs_nulls_first, desc_nulls_first, acs_nulls_last, desc_nulls_last 메서드를 사용하면 널값이 표시되는 기준을 지정할 수 있다.
- 트랜스포메이션을 처리하기 전에 성능을 최적화하기 위해 파티션별 정렬을 수행하기도 한다. 파티션별 정렬은 sortWithinPartitions 메서드로 할 수 있다.
4.15 로우 수 제한하기
- limit 메서드를 사용해 추출할 로우 수를 제한할 수 있다.
4.16 repartition과 coalesce
- 최적화 기법으로 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방법이 있다. 이를 통해 파티셔닝 스키마와 파티션 수를 포함해 클러스터 전반의 물리적 데이터 구성을 제어할 수 있다.
- repartition 메서드를 호출하면 무조건 전체 데이터를 셔플한다. 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 한다.
- 특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링되는 컬럼을 기준으로 파티션을 재분배하는 것이 좋다.
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME"))
- coalesce 메서드는 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우에 사용한다.
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
- 위에 예제는 셔플을 수행해 5개의 파티션으로 나누고, 전체 데이터를 셔플 없이 병합하려는 에제이다.
4.17 드라이버로 로우 데이터 수집하기
- 스파크는 드라이버에서 클러스터 상태 정보를 유지한다. 로컬 환경에서 데이터를 다루려면 드라이버로 데이터를 수집해야 한다.
- collect() 메서드는 모든 데이터를 수집하며, take 메서드는 상위 N개의 로우를 반환한다.
- toLocalIterator 메서드를 사용하면 데이터셋의 파티션을 차례로 반복 처리할 수 있다.
- toLocalIterator 메서드를 사용할 때 매우 큰 파티션이 있다면 드라이브와 애플리케이션이 비정상적으로 종료될 수 있다.
- 연산을 병렬로 수행하지 않기 때문에 매우 큰 처리 비용이 든다.
val collectDf = df.limit(10)
collectDf.take(5)
collectDf.collect()
collectDf.toLocalIterator()
- 드라이버로 모든 데이터 컬렉션을 수집하는 작업은 매우 큰 비용이 발생한다.