Spark - Filter Pushdown
by Gunju Ko
Spark - Predicate Pushdown
스파크는 쿼리를 태스크로 쪼갠뒤에 익스큐터에게 태스크를 전송한다. 쿼리의 성능을 향상시키기 위해서는 익스큐터로 전송되는 데이터의 양을 줄이는 것이다.
실제로 필요하지 않은 데이터 로딩을 방지하는 한 가지 방법은 filter pushdown(predicate pushdown)이다. predicate pushdown을 사용하면 데이터소스에서 필터링을 수행할 수 있다. 만약에 익스큐터가 데이터와 동일한 물리 머신에 있지 않은 경우엔 더 중요하다.
많은 경우에 filter push은 자동으로 적용된다. 그러나 특별한 경우엔 사용자가 특정 정보를 제공하거나 특정 기능을 직접 구현해야 한다.
How to get filters to the data source
- 아래는 예제는 csv 파일에서 데이터를 읽고, position이 tester인 데이터를 필터링합니다.
val dataFramePosition = session
.read.option("header", value = true)
.csv("Filter/src/main/resources/data.csv")
.filter(col("position") === "tester")
dataFramePosition.show()
- 위의 예를 보면 대부분의 행은 필터에 의해 제거되고, 2명만 반환이된다.
- DataFrame을 사용할 때 필터는 데이터소스로 푸쉬다운된다. explain 메서드를 사용해서 DataFrame에 대한 실행계획을 확인할 수 있는데 이를 통해 필터가 push down된 것을 확인할 수 있다.
dataFramePosition.explain()
== Physical Plan ==
*(1) Project [id#16, name#17, age#18, position#19]
+- *(1) Filter (isnotnull(position#19) AND (position#19 = tester))
+- FileScan csv (…) PushedFilters: [IsNotNull(position), EqualTo(position,tester)], ReadSchema: struct<id:string,name:string,age:string,position:string>
- 위 실행 계획을 보면 position 필터가 push down된 것을 볼 수 있다.
- Spark CSV reader는 필터를 만족하는 데이터만 익스큐터로 로드한다.
- 모든 데이터소스가 filter pushdown을 지원하는건 아니다.
Filters containing casts
- 모든 필터가 push down 되는건 아니다.
- 필드를 캐스팅해야 하는 경우 push down 되지 않는다.
Conclusing
- Filter pushdown을 사용하면 쿼리 성능을 크게 향상시킬 수 있다.
- 대부분의 경우 자동으로 적용이 된다.
- 모든 필터 작업이 데이터 소스로 pushdown되는것은 아니다.
출처
- https://engineering.dynatrace.com/blog/optimizing-spark-queries-with-filter-pushdown/