Spark - Predicate Pushdown

스파크는 쿼리를 태스크로 쪼갠뒤에 익스큐터에게 태스크를 전송한다. 쿼리의 성능을 향상시키기 위해서는 익스큐터로 전송되는 데이터의 양을 줄이는 것이다.

실제로 필요하지 않은 데이터 로딩을 방지하는 한 가지 방법은 filter pushdown(predicate pushdown)이다. predicate pushdown을 사용하면 데이터소스에서 필터링을 수행할 수 있다. 만약에 익스큐터가 데이터와 동일한 물리 머신에 있지 않은 경우엔 더 중요하다.

많은 경우에 filter push은 자동으로 적용된다. 그러나 특별한 경우엔 사용자가 특정 정보를 제공하거나 특정 기능을 직접 구현해야 한다.

How to get filters to the data source

  • 아래는 예제는 csv 파일에서 데이터를 읽고, position이 tester인 데이터를 필터링합니다.
val dataFramePosition = session
.read.option("header", value = true)
  .csv("Filter/src/main/resources/data.csv")
  .filter(col("position") === "tester")
dataFramePosition.show()
  • 위의 예를 보면 대부분의 행은 필터에 의해 제거되고, 2명만 반환이된다.
  • DataFrame을 사용할 때 필터는 데이터소스로 푸쉬다운된다. explain 메서드를 사용해서 DataFrame에 대한 실행계획을 확인할 수 있는데 이를 통해 필터가 push down된 것을 확인할 수 있다.
dataFramePosition.explain()
== Physical Plan ==
*(1) Project [id#16, name#17, age#18, position#19]
+- *(1) Filter (isnotnull(position#19) AND (position#19 = tester))
+- FileScan csv (…) PushedFilters: [IsNotNull(position), EqualTo(position,tester)], ReadSchema: struct<id:string,name:string,age:string,position:string>
  • 위 실행 계획을 보면 position 필터가 push down된 것을 볼 수 있다.
  • Spark CSV reader는 필터를 만족하는 데이터만 익스큐터로 로드한다.
  • 모든 데이터소스가 filter pushdown을 지원하는건 아니다.

Filters containing casts

  • 모든 필터가 push down 되는건 아니다.
  • 필드를 캐스팅해야 하는 경우 push down 되지 않는다.

Conclusing

  • Filter pushdown을 사용하면 쿼리 성능을 크게 향상시킬 수 있다.
  • 대부분의 경우 자동으로 적용이 된다.
  • 모든 필터 작업이 데이터 소스로 pushdown되는것은 아니다.

출처

  • https://engineering.dynatrace.com/blog/optimizing-spark-queries-with-filter-pushdown/