이 글은 “스파크 완벽 가이드” 책 내용을 정리한 글입니다.

저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.

Chapter3. 스파크 기능 둘러보기

그림

  • 스파크는 기본 요소인 저수준 API와 구조적 API 그리고 추가 기능을 제공하는 일련의 표준 라이브러리로 구성되어 있다.

1. 운영용 애플리케이션 실행하기

  • spark-submit 명령은 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 한다.
  • 스파크 애플리케이션은 스탠드얼론, 메소스 그리고 YARN 클러스터 매니저를 이용해 실행할 수 있다.
### 스파크 로컬 실행
./bin/spark-submit \
	--class org.apache.spark.examples.SparkPi \
	--master local \
	./examples/jars/spark-examples_2.11-2.2.0.jar 10
  • master 옵션의 인숫값을 변경하면 스파크가 지원하는 스파크 스탠드얼론, 메소스 그리고 YARN 클러스터 매니저에서 동일한 애플리케이션을 실행할 수 있다.

2. Dataset

  • 타입 안정성을 제공하는 구조적 API
  • 정적 타입 코드를 지원하기 위해 고안된 스파크의 구조적 API
  • 동적 언어인 파이썬과 R에선 사용 불가능
  • DataFrame은 다양한 데이터 타입의 테이블형 데이터를 보관할 수 있는 Row 타입의 객체로 구성된 분산 컬렉션이다.
  • Dataset API는 DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당하고 자바의 ArrayList 또는 스칼라의 Seq 객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능을 제공한다.
  • Dataset 클래스 (자바에서 Dataset<T> 스칼라에서는 Dataset[T]로 표기)는 내부 객체의 데이터 타입을 매개변수로 사용한다.
  • 스파크 2.0 버전에서는 자바의 JavaBean 패턴, 스칼라의 케이스 클래스 유형으로 정의된 클래스를 지원
  • 스파크는 자동으로 타입 T를 분석한 다음 Dataset의 표 형식 데이터에 적합한 스키마를 생성
case class Flight(DEST_COUNTRY_NAME: String,
                 ORIGIN_COUNTRY_NAME: String,
                 count: BitInt)
val flightDF = spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightDF.as[Flight]

flights
	.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
	.map(flight_row => flight_row)
	.take(5)
  • Dataset은 필요한 경우에 선택적으로 사용할 수 있다.
  • collect 메서드나 take 메서드를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환한다.

3. 구조적 스트리밍

  • 구조적 스트리밍을 사용하면 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행할 수 있다.
  • 프로토타입을 배치 잡으로 개발한 다음 스트리밍 잡으로 변환할 수 있다.
// 배치 코드
val statisDataFrame = spark.read.format("csv")
	.option("header", "true")
	.option("interSchema", "true")
	.option("/data/retail-data/by-day*.csv")

statisDataFrame.createOrReplaceTempView("retail_view")
val staticSchema = statisDataFrame.schema

statisDataFrame
	.selectExpr(
  	"CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate"
  )
	.groupBy(
  	col("CustomerId"), window(col("InvoiceDate"), "1 day")
  ).sum("total_cost").show(5)
  • 위의 코드는 배치 잡으로 하루동안 고객이 구매한 금액을 구하는 코드이다.
  • 윈도우 함수는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성한다.
  • 위의 코드를 스트리밍 코드로 변환하려면 read 메서드 대신에 readStream 메서드를 사용하면 된다.
// 스트리밍 코드
val streamingDataFrame = spark.readStream("csv")
	.schema(staticSchema)
	.option("maxFilesPerTrigger", 1)
	.option("header", "true")
	.foramt("csv")
	.load("/data/retail-data/by-day*.csv")

streamingDataFrame.isStreaming // true를 반환

val pulchaseByCustomerPerHour = streamingDataFrame
	.selectExpr(
  	"CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")
	.groupBy(
  	col("CustomerId"), window(col("InvoiceDate"), "1 day"))
	.sum("total_cost")
  • maxFilesPerTrigger 옵션은 한번에 읽을 파일 수를 설정한다.
  • 위의 작업 역시 지연 연산이므로 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야 한다.
pulchaseByCustomerPerHour.writeStream
	.format("memory")	// memory = 인메모리 테이블에 저장
	.queryName("customer_purchases")	// 인메모리에 저장될 테이블명
	.outputMode("complete")	// complete = 모든 카운트 수행 결과를 테이블에 저장
	.start()
  • 스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로 count 메서드와 같은 일반적인 정적 액션과는 조금 다른 특성을 가진다. 스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장한다.

3.4 머신러닝과 고급 분석

  • 스파크는 분류(classification), 회귀(regression), 군집화(clustering), 딥러닝(deep learning)에 이르기까지 머신러닝과 관련된 정교한 API를 제공한다.

3.5 저수준 API

  • 스파크는 RDD를 통해 자바와 파이썬 객체를 다루는 데 필요한 다양한 기본 기능을 제공한다.
  • 스파크의 거의 모든 기능은 RDD를 기반으로 만들어졌다.
  • 대부분은 구조적 API를 사용하는것이 좋다. 하지만 RDD를 이용하면 파티션과 같은 물리적 실행 특성을 결정할 수 있으므로 DataFrame보다 더 세밀한 제어를 할 수 있다.
  • RDD는 구조적 API와는 다르게 언어마다 세부 구현 방식에서 차이가 있다.
  • 최신 버전에서 RDD는 잘 사용되지 않지만 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 한다면 RDD를 사용해야 한다.

3.6 SparkR

  • 스파크를 R 언어로 사용하기 위한 기능

3.7 스파크의 에코시스템과 패키지

  • 스파크의 장점은 커뮤니티가 만들어낸 패키지 에코시스템과 다양한 기능이다.
  • 스파크 패키지 목록은 https://spark-packages.org/에서 확인할 수 있다.