스파크 완벽 가이드 - 스파크 기능 둘러보기
by Gunju Ko
이 글은 “스파크 완벽 가이드” 책 내용을 정리한 글입니다.
저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.
Chapter3. 스파크 기능 둘러보기
- 스파크는 기본 요소인 저수준 API와 구조적 API 그리고 추가 기능을 제공하는 일련의 표준 라이브러리로 구성되어 있다.
1. 운영용 애플리케이션 실행하기
spark-submit
명령은 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 한다.- 스파크 애플리케이션은 스탠드얼론, 메소스 그리고 YARN 클러스터 매니저를 이용해 실행할 수 있다.
### 스파크 로컬 실행
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local \
./examples/jars/spark-examples_2.11-2.2.0.jar 10
- master 옵션의 인숫값을 변경하면 스파크가 지원하는 스파크 스탠드얼론, 메소스 그리고 YARN 클러스터 매니저에서 동일한 애플리케이션을 실행할 수 있다.
2. Dataset
- 타입 안정성을 제공하는 구조적 API
- 정적 타입 코드를 지원하기 위해 고안된 스파크의 구조적 API
- 동적 언어인 파이썬과 R에선 사용 불가능
- DataFrame은 다양한 데이터 타입의 테이블형 데이터를 보관할 수 있는 Row 타입의 객체로 구성된 분산 컬렉션이다.
- Dataset API는 DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당하고 자바의
ArrayList
또는 스칼라의Seq
객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능을 제공한다. - Dataset 클래스 (자바에서 Dataset<T> 스칼라에서는 Dataset[T]로 표기)는 내부 객체의 데이터 타입을 매개변수로 사용한다.
- 스파크 2.0 버전에서는 자바의 JavaBean 패턴, 스칼라의 케이스 클래스 유형으로 정의된 클래스를 지원
- 스파크는 자동으로 타입 T를 분석한 다음 Dataset의 표 형식 데이터에 적합한 스키마를 생성
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BitInt)
val flightDF = spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightDF.as[Flight]
flights
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(flight_row => flight_row)
.take(5)
- Dataset은 필요한 경우에 선택적으로 사용할 수 있다.
- collect 메서드나 take 메서드를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환한다.
3. 구조적 스트리밍
- 구조적 스트리밍을 사용하면 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행할 수 있다.
- 프로토타입을 배치 잡으로 개발한 다음 스트리밍 잡으로 변환할 수 있다.
// 배치 코드
val statisDataFrame = spark.read.format("csv")
.option("header", "true")
.option("interSchema", "true")
.option("/data/retail-data/by-day*.csv")
statisDataFrame.createOrReplaceTempView("retail_view")
val staticSchema = statisDataFrame.schema
statisDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate"
)
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day")
).sum("total_cost").show(5)
- 위의 코드는 배치 잡으로 하루동안 고객이 구매한 금액을 구하는 코드이다.
- 윈도우 함수는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성한다.
- 위의 코드를 스트리밍 코드로 변환하려면 read 메서드 대신에 readStream 메서드를 사용하면 된다.
// 스트리밍 코드
val streamingDataFrame = spark.readStream("csv")
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.option("header", "true")
.foramt("csv")
.load("/data/retail-data/by-day*.csv")
streamingDataFrame.isStreaming // true를 반환
val pulchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
- maxFilesPerTrigger 옵션은 한번에 읽을 파일 수를 설정한다.
- 위의 작업 역시 지연 연산이므로 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야 한다.
pulchaseByCustomerPerHour.writeStream
.format("memory") // memory = 인메모리 테이블에 저장
.queryName("customer_purchases") // 인메모리에 저장될 테이블명
.outputMode("complete") // complete = 모든 카운트 수행 결과를 테이블에 저장
.start()
- 스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로 count 메서드와 같은 일반적인 정적 액션과는 조금 다른 특성을 가진다. 스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장한다.
3.4 머신러닝과 고급 분석
- 스파크는 분류(classification), 회귀(regression), 군집화(clustering), 딥러닝(deep learning)에 이르기까지 머신러닝과 관련된 정교한 API를 제공한다.
3.5 저수준 API
- 스파크는 RDD를 통해 자바와 파이썬 객체를 다루는 데 필요한 다양한 기본 기능을 제공한다.
- 스파크의 거의 모든 기능은 RDD를 기반으로 만들어졌다.
- 대부분은 구조적 API를 사용하는것이 좋다. 하지만 RDD를 이용하면 파티션과 같은 물리적 실행 특성을 결정할 수 있으므로 DataFrame보다 더 세밀한 제어를 할 수 있다.
- RDD는 구조적 API와는 다르게 언어마다 세부 구현 방식에서 차이가 있다.
- 최신 버전에서 RDD는 잘 사용되지 않지만 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 한다면 RDD를 사용해야 한다.
3.6 SparkR
- 스파크를 R 언어로 사용하기 위한 기능
3.7 스파크의 에코시스템과 패키지
- 스파크의 장점은 커뮤니티가 만들어낸 패키지 에코시스템과 다양한 기능이다.
- 스파크 패키지 목록은 https://spark-packages.org/에서 확인할 수 있다.