스파크 완벽 가이드 - 구조적 API 기본 연산

이 글은 “스파크 완벽 가이드” 책 내용을 정리한 글입니다.

저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.

구조적 API 기본 연산

  • DataFrame은 ROW 타입이 레코드와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성된다.
  • 스키마는 각 컬럼명과 데이터 타입을 정의한다.
  • DataFrame의 파티셔닝은 DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의한다.
  • 파티셔닝 스키마는 파티션을 배치하는 방법을 정의한다.
    • 파티셔닝의 분할 기준은 특정 컬럼이나 비결정론적 값을 기반으로 설정할 수 있다.
// 스칼라 코드
val df = spark.read.format("json")
	.load("/data/flight-data/json/2015-summary.json")

df.printSchema()
  • DataFrame은 컬럼을 가지며 스키마로 컬럼을 정의한다.

스키마

  • 스키마는 컬럼명과 데이터 타입을 정의한다.
  • 데이터를 읽기 전에 스키마를 정의해야 하는지 여부는 상황에 따라 달라진다.
    • 비정형 분석에서는 스키마 온 리드가 대부분 잘 동작한다. 단 정밀도는 떨어질 수 있다.
    • 운영환경에서 ETL 작업을 수행한다면 직접 스키마를 정의하는게 좋다.
    • CSV나 JSON 등의 데이터소스를 사용하는 경우 스키마 추론 과정에서 읽어 들인 샘플 데이터의 타입에 따라 스키마를 결정한다.
spark.read.format("json").load("/data/flight-data/json/2015-summary.json").schema

# 결과
StructType = 
	StructType(
    StructField(DEST_COUNTRY_NAME, StringType, true),
    StructField(ORIGIN_COUNTRY_NAME, StringType, true),
    StructField(COUNT, LongType, true)
  )
  • 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체이다.
  • StructField는 이름, 데이터 타입, 컬럼이 값이 없거나 널일 수 있는지 지정하는 불리언값을 가진다.
  • 컬럼에 관련된 메타데이터를 지정할 수도 있다.
  • 스키마는 복합 데이터 타입인 StructType을 가질 수 있다.
  • 스파크는 런타임에 데이터 타입이 스키마의 데이터 타입과 일치하지 않으면 오류를 발생시킨다.
val myManualSchema = StructType(Array(
	StructField("DEST_COUNTRY_NAME", StringType, true),
  StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  StructField("count", LongType, false, Metadata.fromJson("..."))
))

val df = spark.read.format("json").schema(myManualSchema)
  • 스키마는 자체 데이터 타입 정보를 사용한다.

컬럼과 표현식

  • 컬럼은 표현식이다. (컬럼은 표현식의 일부 기능을 제공한다)
  • 표현식으로 컬럼을 선택, 조작, 제거할 수 있다.
  • 컬럼에 접근하려면 DataFrame이 필요하다.
  • 컬럼의 내용을 수정하려면 DataFrame의 트랜스포메이션을 사용해야 한다.

컬럼

  • col, column 함수 : 컬럼을 생성하거나 참조할 때 사용
col("someColumnName")
column("someColumnName")

// 스칼라에서는 $, '를 사용해서 컬럼을 참조할 수 있다.
$"myColumn"
'myColum

명시적 컬럼 참조

df.col("count")
  • col 메서드를 사용해 명시적으로 컬럼을 정의하면 스파크는 분석기 실행 단계에서 컬럼 확인 절차를 생략한다.

표현식

  • 표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미한다. 여러 컬럼명을 입력으로 받아 식별하고, “단일 값”을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수라고 생각할 수 있다.
  • 표현식은 expr 함수로 사용할 수 있다.
// 아래 구문은 동일하다.
expr("someCol")
col("someCol")
  • 컬럼은 단지 표현식일 뿐이다.
  • 컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행 계획으로 컴파일된다.
// 아래 코드는 동일한 논리적 실행 계획으로 컴파일 된다.
(((col("someCol") + 5) * 200) - 6) < col("otherCol")

expr("(((someCol + 5) * 200) - 6) < otherCol")

참고) 논리적 트리

그림

// 데이터프레임 컬럼에 접근하기
spark.read.format("/json").load("/data/flight-data/json/2015-summary.json").columns

레코드와 로우

  • DataFrame에서 각 로우는 하나의 레코드이다.
  • 레코드를 Row 객체로 표현한다.
    • Row 객체는 내부에 바이트배열일 가진다.
    • 이 바이트 배열 인터페이스는 오직 컬럼 표현식으로만 다룰 수 있다.
  • DataFrame을 사용해 드라이버에 개별 로우를 반환하는 명령은 항상 하나 이상의 Row 타입을 반환한다.

로우 생성하기

  • Row 객체는 스키마정보를 가지고 있지 않는다. (DataFrame만 유일하게 스키마를 갖는다.)
  • Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 한다.
val myRow = Row("Hello", null, 1, false)

myRow(0)
myRow.getString(0)
myRow.getInt(2)

DataFrame의 트랜스포메이션

  • DataFrame을 다루는 방법은 몇가지 주요 작업으로 나눌 수 있다.
    • 로우나 컬럼 추가
    • 로우나 컬럼 제거
    • 로우를 컬럼으로 변환하거나, 그 반대로 변환
    • 컬럼값을 기준으로 로우 순서 변경

데이터프레임 생성하기

val df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")

val myManualSchema = new StructType(Array(
	new StructField("some", StringType, true),
	new StructField("col", StringType, true),
	new StructField("names", LongType, false)
))

val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)

_implicit는 기본 스칼라 객체를 DataFrame으로 변환하는데 사용된다. _implicit를 임포트해서 Seq 데이터 타입에 toDf 함수를 활용하여 DataFrame으로 변환할 수 있다.

select와 selectExpr

  • 데이터 테이블에 SQL을 실행하는 것처럼 데이터프레임에서도 SQL을 사용할 수 있다.
// 스칼라 코드
df.select("DEST_COUNTRY_NAME").show(2)
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
df.select(
  df.col("DEST_COUNTRY_NAME"),
  col("DEST_COUNTRY_NAME"),
  column("DEST_COUNTRY_NAME"),
  'DEST_COUNTRY_NAME,
  $"DEST_COUNTRY_NAME",
  expr("DEST_COUNTRY_NAME")
).show(2)
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

df.select(expr("DEST_COUNTRY_NAME AS destination")).alias("DEST_COUNTRY_NAME").show(2)

df.selectExpr("DEST_COUNTRY_NAME AS destination").show(2)
  • selectExpr 메서드는 새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구이다.
df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
	.show(2)
  • select 표현식에는 DataFrame의 컬럼에 대한 집계 함수를 지정할 수 있다.
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

컬럼 추가하기

  • withColmn 메서드를 사용해서 컬럼을 추가할 수 있다.
  • withColumn 메서드는 2개의 인수를 사용한다. 하나는 컬럼명이고 다른 하나는 값을 생성하는 표현식이다.
df.withColumn("numberOne", lit(1))

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))

// 컬럼명 변경
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest")

예약 문자와 키워드

  • 공백이다 하이픈(-) 같은 예약 문자는 컬럼명에 사용할 수 없다. 사용하려면 백틱(`)문자를 이용해 이스케이핑해야 한다.

대소문자 구분

  • 기본적으로 대소문자를 가리지 않는다.
set spark.sql.caseSensitive true
  • 위 설정을 사용해 대소문자를 구분하게 만들 수 있다.

컬럼 제거하기

df.drop("ORIGIN_COUNTRY_NAME")

데이터 타입 변경하기

df.withColumn("count", col("count").cast("string"))

로우 필터링하기

  • DataFrame의 가장 일반적인 필터링 방법은 문자열 표현식이나 컬럼을 다루는 기능을 이용해 표현식을 만드는 거다.
  • DataFrame의 where 메서드나 filter 메서드로 필터링할 수 있다.
    • 두 메서드 모두 같은 연산을 수행하며 같은 파라미터 타입을 사용한다.
df.filter(col("count") < 2)
df.where("count < 2")
  • 같은 표현식에 여러 필터를 적용해야 할 때도 있다. 하지만 스파크는 자동으로 필터의 순서와 상관없이 동시에 모든 필터링 작업을 수행하기 때문에 항상 유용한 것은 아니다. 그러므로 여러 개의 AND 필터를 지정하려면 차례대로 필터를 연결하고 판단은 스파크에 맡겨야한다.
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
  • 스칼라에서는 반드시 =!= === 연사자를 사용해야 한다. (사실 스파크의 Column 클래스에 정의된 함수이다.)
    • 컬럼 표현식과 문자열을 비교할 때 =!= 연산자를 사용하면, 컬럼의 실제값을 비교 대상 문자열과 비교한다.

고유한 로우 얻기

  • DataFrame의 모든 로우에서 중복 데이터를 제거할 수 있는 distinct 메서드를 사용해 고윳값을 찾을 수 있다.
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

무작위 샘플 만들기

  • 무작위 샘플 데이터를 얻으려면 sample 메서드를 사용한다.

임의 분할하기

val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)

로우 합치기와 추가하기

  • DataFrame은 불변성을 가지므로 레코드 추가는 불가능하다.
  • DataFrame에 레코드를 추가하려면 원본 DataFrame을 새로운 DataFrame과 통합해야 한다.
  • 통합하려는 두 개의 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야한다.
  • union 메서드는 현재 스키마가 아닌 컬럼 위치를 기반으로 동작한다. 따라서 사용자가 생각한 대로 자동 정렬되지 않을 수 있다.
df.union(newDf)

로우 정렬하기

  • sortBy 또는 orderBy 사용
  • 기본 동작은 오름차순 정렬이다.
  • 정렬 기준을 명확히 하려면 asc나 desc 함수를 사용한다.
df.sort("count")
df.orderBy("count", "DEST_COUNTRY_NAME")
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME"))
  • asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드를 사용하여 정렬된 DataFrame에서 널 값이 표시되는 기준을 지정할 수 있다.
  • 트랜스포메이션을 처리하기 전에 성능을 최적화하기 위해 파티션별 정렬을 수행하기도 한다. 파티션별 정렬은 sortWithinPartitions 메서드로 할 수 있다.
spark.read.format("json").load("/data/flight-data/json/*-summary.json")
	.sortWithPartition("count")

로우 수 제한하기

  • limit 메서드 사용
df.limit(5)

repartition과 coalesce

  • 최적화 기법으로 자주 필터링하는 컬럼을 기준으로 데이터를 분할할 수 있다. 이를 통해 파티셔닝 스키마와 파티션 수를 포함해 클러스터 전반의 물리적 데이터 구성을 제어할 수 있다.
  • repartition 메서드를 호출하면 무조건 전체 데이터를 셔플한다. 향후에 사용할 파티션 수가 현재 파티션수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우메나 사용해야 한다.
df.rdd.getNumPartitions

df.repartition(5)
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME"))
  • 특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링하는 컬럼을 기준으로 파티션을 재분배하는것이 좋다.

  • coalesce는 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우에 사용한다.

    • 파티션수를 줄이려면 repartition 대신에 coalesce를 사용해야 한다.
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

드라이버로 로우 데이터 수집하기

  • 로컬에서 데이터를 다루려면 드라이버로 데이터를 수집해야 한다.
  • collect 메서드는 전체 DataFrame의 모든 데이터를 수집하며, take 메서드는 상위 N개의 로우를 반환한다.
  • toLocalIterator 메서드는 반복자로 모든 파티션의 데이터를 드라이버에 전달한다.
val collectDF = df.limit(10)
collectDF.take(5)
collectDF.show()

collectDF.toLocalIterator()

드라이버로 모든 컬렉션을 수집하는 작업은 매우 큰 비용이 발생한다.