스파크 완벽 가이드 - 구조적 API 기본 연산

이 글은 “스파크 완벽 가이드” 책 내용을 정리한 글입니다.

저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.

  • DataFrame은 Row 타입의 레코드와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성된다.
  • 스키마는 각 컬럼명과 데이터 타입을 정의한다.
  • DataFrame의 파티셔닝은 DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의한다.
  • 파티셔닝 스키마는 파티션을 배치하는 방법을 정의한다.
  • 파티셔닝 분할 기준은 특정 컬럼이나 비결정론전(nondeterministically) 값을 기반으로 설정할 수 있다.
val df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.printSchema()

1. 스키마

  • 스키마는 DataFrame의 컬럼명과 데이터 타입을 정의한다.

스카마를 정의해야 하는가?

  • 상황에 따라 다르다. 비정형 분석에서는 스키마-온-리드가 대부분 잘 동작한다.
  • Long 데이터 타입을 Integer 데이터 타입으로 잘못 인식하는 등 정밀도 문제가 발생할 수 있다.
  • 운영환경에서 ETL 작업에 스파크를 사용한다면 직접 스키마를 정의하는게 좋다.
  • 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체이다.
    • 스키마는 복합 데이터 타입인 StructType을 가질 수 있다.
  • StructField는 이름, 데이터 타입, 컬럼이 값이 없거나 null일 수 있는지 지정하는 불리언값을 가진다.
    • 필요한 경우 컬럼과 관련된 메타데이터를 지정할 수도 있다.
  • 스키마는 런타임에 데이터 타입이 스카마의 데이터 타입과 일치하지 않으면 오류를 발생시킨다.
val manualSchema = StructType(Array(
  StructField("DEST_CONTRY_NAME", StringType, true),
  StructField("ORIGIN_CONTRY_NAME", StringType, true),
  StructField("count", LongType, false, Metadata.fromJson("{\"hello\":\"world\"}")),  
))

val df = spark.read.format("json").schema(manualSchema)
	.load("/data/flight-data/json/2015-summary.json")
  • 스파크는 자체 데이터 타입 정보를 사용하므로 프로그래밍 언어의 데이터 타입을 스파크의 데이터 타입으로 설정할 수 없다.

2. 컬럼과 표현식

  • 사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있다.
  • 컬럼은 DataFrame을 통해서 접근할 수 있다.
  • 컬럼의 내용을 수정하려면 DataFrame의 트랜스포메이션을 사용해야 한다.

2.1 컬럼

  • 컬럼을 생성하고 참조할 수 있는 가장 간단한 방법은 col 함수나 column 함수를 사용하는 것이다.
col("someColumnName")
column("someColumnName")
  • 컬럼이 DataFrame에 없을지 있을지는 알 수 없으며 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태로 남는다.

스칼라는 고유 기능을 사용해 다음과 같은 방법으로 컬럼을 참조할 수 있다.

  • $”myColumn”
  • `myColumn`
  • DataFrame의 컬럼은 col 메서드로 참조한다.

2.2 표현식

  • 표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미한다.
    • 여러 컬럼명을 입력으로 받아 식별하고, ‘단일 값’을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수라고 생각할 수 있다.
  • 표현식은 expr 함수로 가장 간단히 사용할 수 있다.
    • DataFrame의 컬럼을 참조할 수도 있다.
    • 컬럼은 단지 표현식일 뿐이다.
  • 컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행 계획으로 컴파일 된다.
(((colr("someCol") + 5) * 200) - 6) < col("otherCol")

expr("(((someCol + 5) + 200) - 6) < otherCol")
  • 위 코드는 실행 시점에 동일한 논리 트리로 컴파일된다.
  • SQL의 SELECT 구문에 위 표현식을 마찬가지로 동일한 논리 트리로 컴파일된다.

  • 아래와 같은 방식으로 컬럼에 접근할 수 있다.
val df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
	.columns

3. 레코드와 로우

  • DataFrame의 각 로우는 하나의 레코드이다.
  • 스파크는 레코드를 Row 객체로 표현한다.
  • Row 객체는 내부에 바이트배열을 가지며 이 바이트 배열 인터페이스는 오직 컬럼 표현식으로만 다룰 수 있다.

3.1 로우 생성하기

  • Row 객체는 스키마 정보를 가지고 있지 않고 DataFrame만 유일하게 스키마를 갖는다.
  • Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 한다.
val myRow = Row("Hello", null, 1, false)

myRow(0).asInstanceOf[String] // String 타입
myRow.getString(0) // String 타입
myRow.getInt(2) // Int 타입

4. DataFrame의 트랜스포메이션

  • DataFrame을 다루는 방법은 몇가지 주요 작업을 나눌 수 있다.
    • 로우나 컬럼 추가
    • 로우나 컬럼 제거
    • 로우를 컬럼으로 변환하고나, 그 반대로 변환
    • 컬럼값을 기준으로 로우 순서 변경

4.1 DataFrame 생성하기

// 데이터소스에서 DataFrame 생성하기
val df = spark.read.format("json")
	.load("/data/flight-data/json/2015-summary.json")

// 임시뷰로 등록
df.createOrReplaceTempView("dfTable")
// Row 객체를 가진 Seq 타입을 직접 변환해 DataFrame을 생성
val manualSchema = new StructType(Array(
	new StructField("some", StringType, true),
  new StructField("col", StringType, true),
  new StructField("names", LongType, false),
))

val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRdd, manualSchema)

4.2 select와 selectExpr

  • select와 selectExpr 메서드를 사용하면 데이터 테이블에 SQL을 실행하는것처럼 DataFrame에서도 SQL을 사용할 수 있다.
SELCT * FROM dataFrameTable
// 컬럼명을 인수로 받는 select 메서드
df.select("DESC_COUNTRY_NAME").show(2)
--- SQL
SELECT DESC_COUNTRY_NAME FROM dfTable LIMIT 2

// 여러 컬럼을 선택 가능
df.select("DESC_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
--- SQL
SELECT DESC_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2
  • 컬럼을 참조하는 다양한 방법을 섞어가면서 사용할 수 있다.
df.select(
	df.col("DEST_COUNTRY_NAME"),
  col("DEST_COUNTRY_NAME"),
  column("DEST_COUNTRY_NAME"),
  'DEST_COUNTRY_NAME',
  $"DEST_COUNTRY_NAME",
  expr("DEST_COUNTRY_NAME")
).show(2)
  • Column 객체와 문자열을 함께 섞어 쓰는 경우엔 컴파일 오류가 발생한다.
// compile error
df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")
  • expr 함수는 가장 유연한 참조 방법이다. 아래 코드는 컬러명을 destination으로 변경한다.
// 스칼라 코드
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
  • selectExpr 메서드는 select 메서드에 expr 함수를 사용하는것과 같은 기능을 한다.
df.selectExpr("DEST_COUNTRY_NAME AS destination").show(2)
  • selectExpr 메서드는 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구이다. 사실 모든 유효현 비집계형 SQL 구문을 지정할 수 있다.
df.selectExpr(
  "*",
  "(DESC_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry"
).show(2)

4.3 스파크 데이터 타입으로 변환하기

  • 명시적인 값을 스파크에 전달해야 할 경우엔 리터럴을 사용한다.

df.select(expr(*), lit(1).as("One")).show(2)
  • SQL에서 리터럴은 상수값을 의미한다.
SELECT *, 1 as One FROM dfTable LIMIT 2

4.4 컬럼 추가하기

  • withColumn 메서드를 사용하면 컬럼을 추가 할 수 있다.
df.withColumn("numberOne", lit(1)).show(2)

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))

4.5 컬럼명 변경하기

  • withColumnRenamed 메서드로 컬럼명을 변경할 수 있다.
df.withColumnRenamed("DEST_COUNTRY_NAME", "desc").columns

4.6 예약 문자와 키워드

  • 공백이나 하이픈(-) 같은 예약 문자는 컬럼명에 사용할 수 없다.
  • 예약 문자를 컬럼명에 사용하려면 백틱(`) 문자를 이용해 이스케이핑 해야한다.

4.7 대소문자 구분

  • 대소문자를 구분하지 않는다.
set spark.sql.caseSensitive true

4.8 컬럼 제거하기

  • drop 메서드를 사용
df.drop("ORIGIN_COUNTRY_NAME")

4.9 컬럼 데이터 타입 변경

  • cast 메서드를 사용

4.10 로우 필터링

  • where 메서드나 filter 메서드로 필터링 할 수 었음
df.filter(col("count") < 2).show(2)
df.where(col("count") < 2).show(2)

--- SQL
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
  • 같은 표현식에 여러 필터를 적용해야할 때도 있다. 하지만 스파크는 자동으로 필터의 순서와 상관없이 동시에 모든 필터링 작업을 수행하기 때문에 항상 유용한 것은 아니다.

4.11 고유한 로우 얻기

  • distinct 메서드를 사용해서 고윳값을 찾을 수 있다.
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

4.12 무작위 샘플 만들기

  • sample 메서드를 사용하면 DataFrame에서 무작위 데이터를 얻을 수 있다.
    • 표본 데이터 추출 비율을 지정할 수 있다.
    • 복원 추출이나 비복원 추출의 사용 여부를 지정할 수도 있다.

4.12 임의 분할하기

  • randomSplit은 DataFrame을 임의 크기로 분할할 때 유용하게 사용된다.
  • 머신러닝 알고리즘에서 사용할 학습셋, 검증셋, 그리고 테스트셋을 만들 때 주로 사용한다.
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count()
  • 시드값을 반드시 설정해야한다.

4.13 로우 합치기와 추가하기

  • DataFrame은 불변성을 가지므로 레코드를 추가하는 작업은 불가능하다.
  • DataFrame에 레코드를 추가하려면 원본 DataFrame을 새로운 DataFrame과 통합해야 한다.
    • 통합하려는 두 개의 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야한다.
val schema = df.schema

val newRows  = Seq(
	Row("New Country", "Other Country", 5L),
	Row("New Country 2", "Other Country 3", 1L),
)

val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDf = spark.createDataFrame(parallelizedRows, schema)

df.union(newDf)
	.where("count = 1")
	.show()
  • 스칼라에서는 반드시 =!= 연산자를 사용해야한다. 문자열을 비교할 때 =!= 연산자를 사용하면 컬럼의 실제값을 비교 대상 문자열과 비교한다.
  • 로우가 추가된 DataFrame을 참조하려면 새롭게 만들어진 DataFrame 객체를 사용해야 한다.

4.14 로우 정렬하기

  • sortBy, orderBy 메서드를 사용해 정렬할 수 있다. 두 메서드는 같은 방식으로 동작한다.
  • 기본 동작은 오름차순 정렬이다.
df.sort("count").show(5)
df.orderBy("count").show(5)

df.sort(desc("count")).show(5)
df.orderBy(asc("count")).show(5)
  • 정렬 기준을 명확히하려면 asc나 desc 함수를 사용한다.
  • acs_nulls_first, desc_nulls_first, acs_nulls_last, desc_nulls_last 메서드를 사용하면 널값이 표시되는 기준을 지정할 수 있다.
  • 트랜스포메이션을 처리하기 전에 성능을 최적화하기 위해 파티션별 정렬을 수행하기도 한다. 파티션별 정렬은 sortWithinPartitions 메서드로 할 수 있다.

4.15 로우 수 제한하기

  • limit 메서드를 사용해 추출할 로우 수를 제한할 수 있다.

4.16 repartition과 coalesce

  • 최적화 기법으로 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방법이 있다. 이를 통해 파티셔닝 스키마와 파티션 수를 포함해 클러스터 전반의 물리적 데이터 구성을 제어할 수 있다.
  • repartition 메서드를 호출하면 무조건 전체 데이터를 셔플한다. 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 한다.
  • 특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링되는 컬럼을 기준으로 파티션을 재분배하는 것이 좋다.
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME"))
  • coalesce 메서드는 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우에 사용한다.
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
  • 위에 예제는 셔플을 수행해 5개의 파티션으로 나누고, 전체 데이터를 셔플 없이 병합하려는 에제이다.

4.17 드라이버로 로우 데이터 수집하기

  • 스파크는 드라이버에서 클러스터 상태 정보를 유지한다. 로컬 환경에서 데이터를 다루려면 드라이버로 데이터를 수집해야 한다.
  • collect() 메서드는 모든 데이터를 수집하며, take 메서드는 상위 N개의 로우를 반환한다.
  • toLocalIterator 메서드를 사용하면 데이터셋의 파티션을 차례로 반복 처리할 수 있다.
    • toLocalIterator 메서드를 사용할 때 매우 큰 파티션이 있다면 드라이브와 애플리케이션이 비정상적으로 종료될 수 있다.
    • 연산을 병렬로 수행하지 않기 때문에 매우 큰 처리 비용이 든다.
val collectDf = df.limit(10)

collectDf.take(5)
collectDf.collect()
collectDf.toLocalIterator()
  • 드라이버로 모든 데이터 컬렉션을 수집하는 작업은 매우 큰 비용이 발생한다.