RDD로 프로그래밍 하기

이 글은 “러닝 스파크” 책 내용을 정리한 글입니다.

저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.

RDD 기초

  • RDD : 분산되어 존재하는 변경 불가능한 데이터 요소들의 모임
  • 스파크에서의 모든 작업은 새로운 RDD를 만들거나, 존재하는 RDD를 변형하거나, 결과 계산을 위해 RDD에서 연산을 호출하는 것 중의 하나로 표현된다.
  • 내부적으로 스파크는 자동으로 RDD에 있는 데이터를 클러스터에 분배하며 클러스터 위에서 수행하는 연산들을 병렬화한다.
  • 각 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러 개의 파티션으로 나뉜다.
  • RDD는 어떤 타입의 객체든 가질 수 있다.

RDD 생성

  • 외부 데이터세트를 로드
  • 프로그램에서 객체 컬렉션(collection)을 분산시킨다.
>>> lines = sc.textFile("REAME.md")

RDD 연산

  • 트랜스포메이션 : RDD에서 새로운 RDD를 만들어 낸다.
  • 액션 : RDD를 기초로 결과 값을 계산하며 그 값을 드라이버 프로그램에 되돌려 주거나 외부 스토리지(HDFS 등)에 저장한다.
>>> pythonLines = lines.filter(lambda line: "Python" in line)

>>> pythonLines.first()

Lazy evaluation

  • RDD는 처음 액션을 사용하는 시점에 처리된다.
  • 스파크는 한번에 모든 트랜스포메이션끼리의 연계를 파악한다면 결과 도출에 필요한 데이터만 연산하는게 가능하다.
  • 위의 예제에서 first() 액션에서는 스파크는 처음 일치하는 라인이 나올 때까지만 파일을 읽을 뿐 전체 파일을 읽거나 하지 않는다.

영속화

  • 스파크의 RDD들은 기본적으로 액션이 실행될 때마다 매번 새로 연산을 한다.
  • 여러 액션에서 RDD 하나를 재사용하고 싶은 경우 RDD.persist()를 사용하여 계속 결과를 유지하도록 요청할 수 있다.
    • 첫 연산이 이루어진 후 스파크는 RDD의 내용을 메모리에 저장하게 되며(클러스터의 여러 머신들에 나뉘어서), 이 후의 액션들에서 재사용한다.
  • RDD를 재사용하지 않기로 한다면 스파크가 일회성 데이터를 가져와 결과만 계산하고 데이터를 저장할 필요가 없는 경우에 굳이 스토리지 공간을 낭비할 필요가 없다.
>>> pythonLines.persist()

>>> pythonLines.count()

>>> pythonLines.first()

위 예제는 재사용을 위한 중간 단계의 RDD들을 보존하기 위해 스파크에 persist()를 요청한다.

RDD 생성하기

객체 컬렉션을 병렬화

  • SparkContext#parallelize() 메서드 사용
  • 주로 프로토타이핑이나 테스트 용도로 사용
val lines = sc.parallelize(List("Pandas", "I like pandas"))

외부 스토리지에서 데이터를 불러옴

  • 일반적인 방법
val lines = sc.textFile("/path/to/REAME.md")

RDD의 연산

  • 스파크는 트랜스포메이션과 액션을 매우 다르게 취급하므로 자신이 실행하는 연산의 타입에 대한 이해는 매우 중요하다.
  • 트랜스포메이션은 RDD를 반환하지만, 액션은 그 외의 다른 데이터 타입을 반환한다.

트랜스포메이션

  • 새로운 RDD를 만들어 돌려주는 RDD의 연산 방식이다.
  • 트랜스포메이션된 RDD는 실제로 액션을 사용하는 다소 늦은 시점에 계산된다.
  • 대부분의 트랜스포메이션들은 한 번에 하나의 요소에서만 작업이 이루어진다.
  • 트랜스포메이션은 기존 RDD를 변경하지 않는다. 대신 완전히 새로운 RDD를 리턴한다.
errosRDD = inputRDD.filter(lambda: x: "error" in x)
warningsRDD = inputRDD.filter(lambda: x: "warning" in x)

badLinesRDD = errosRDD.union(warningsRDD)
  • 트랜스포메이션은 입력할 수 있는 RDD 개수에 대한 제한이 없다.
  • 새로운 RDD를 만들면 스파크는 각 RDD에 대해 가계도 (Lineage graph)라 불리는 관계 그래프를 갖고 있게 된다. 스파크는 이 정보를 필요 시 각 RDD를 재연산하거나 저장된 RDD가 유실될 경우 복구를 하는 등의 경우에 활용한다.

액션

  • 액션은 드라이버 프로그램에 최종 결과값을 되돌려 주거나 외부 저장소에 기록하는 연산 작업이다.
  • 액션은 실제로 결과 값을 내어야 하므로 트랜스포메이션이 계산을 수행하도록 강제한다.
  • collect()를 사용하면 전체 RDD 데이터를 가져올 수 있다. 다만 데이터세트가 너무 크면 collect()를 사용할 수 없다. 이런 경우에는 HDFS나 S3 같은 분산 파일 시스템에 데이터를 써 버리는 것이 일반적이다.
  • 새로운 액션을 호출할 때마다 전체 RDD가 처음부터 계산된다. 이런 비효율성을 피하려면 중간 결과를 영속화 하는 방법을 쓸 수 있다.

여유로운 수행 방식

  • RDD에 대한 트랜스포메이션을 호출할 때 그연산이 즉시 수행되는 것이 아니다. 대신 내부적으로 스파크는 메타데이터에 이런 연산이 요청되었다는 사실만 기억한다.
  • RDD에 데이터를 로드(sc.textFile())하는 것도 여유롭게 수행된다.
  • 액션이 호출되면 실제 연산이 수행된다.
  • 스파크는 연산들을 그룹지어서 데이터를 전달해야 하는 횟수를 줄이기 위해 여유로운 수행방식을 사용한다.
  • 스파크는 단순한 연산들을 많이 연결해서 사용하는 것이나 하나의 복잡한 매핑 코드를 쓰는 것이나 큰 차이가 없다. 그러므로 스파크 사용자들은 프로그램을 더 작게 만들고, 효율적인 연산의 코드를 만들어 내야 한다는 부담이 적다.

스파크에 함수 전달하기 - 스칼라

  • 인라인으로 정의된 함수나 메서드에 대한 참조, 정적 함수를 전달할 수 있다.
  • 전달하는 함수나 참조하는 데이터들이 직렬화 가능해야 한다. (Serializable 인터페이스 구현)
  • 객체의 메서드나 필드를 전달하면 전체 객체에 대한 참조 또한 포함된다.
class SearchFunctions(val query: String) {
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  
  def getMatchesFunctionReference(rdd: RDD[String]): RDD[Boolean] = {
    // 문제 : "isMatch"는 "this.isMatch"이므로 this의 모든 것이 전달된다.
    rdd.map(isMatch)
  }
  
  def getMatchesFieldReference(rdd: RDD[String]): RDD[Array[String]] = {
    // 문제 : "query"는 "this.query"이므로 this의 모든 것이 전달된다.
    rdd.map(x => x.split(query))
  }
  
  def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
    // 안전함 : 필요한 필드만 추출하여 지역 변수에 저장해 전달한다.
    val query_  = this.query
    rdd.map(x => x.split(query_))
  }
}
  • 스칼라에서 NotSerializableException이 발생한다면 직렬화 불가능한 클래스의 메서드나 필드를 참조하는 문제일 가능성이 많다.

많이 쓰이는 트랜스포메이션과 액션

기본 RDD

  • 데이터에 상관없이 모든 RDD에 적용할 수는 트랜스포메이션과 액션
데이터 요소 위주 트랜스포메이션
  • map() : RDD의 각 데이터에 함수를 적용하고 결과 RDD에 각 데이터의 새 결과 값을 담는다.
  • flatMap() : 각 입력 데이터에 대해 여러개의 아웃풋 데이터를 생성할 때 사용
    • 함수에서 반복자를 리턴해야 한다.
  • filter() : filter() 함수를 통과한 데이터만 담는다.
가상 집합 연산
  • RDD는 합집합, 교집합 같은 다양한 수학적 집합 연산을 지원한다.
  • distinct() : 중복이 없는 데이터세트를 원할 경우에 사용한다.
    • 셔플링이 일어나기 때문에 비싼 작업이다.
  • union() : 양쪽 RDD의 데이터를 합해서 돌려준다. 원본 데이터들이 중복되더라도 그 중복을 유지한다.
  • intersection() : 양쪽 RDD에 동시에 존재하는 요소만 되돌려 준다.
    • 모든 중복을 제거한다.
    • 중복을 찾기 위해 셔플링이 수반되므로 union() 보다 성능이 떨어진다.
  • subtract : 다른 RDD를 인자로 받아 첫 번째 RDD의 항목 중 두 번째 RDD에 있는 항목을 제외한 항목들을 가진 RDD를 되돌려 준다.
    • 셔플링이 수행된다.
  • cartesian : 첫 번째 RDD에 있는 데이터 a와 두 번째 RDD에 있는 데이터 b에 대해 모든 가능한 쌍 (a, b)를 되돌려 준다.
    • 비용이 매우 큰 작업이다.

액션

  • reduce() : 인자로 두 개의 데이터를 합쳐 같은 데이터 하나를 반환하는 함수를 받는다.
    • 결과 값의 타입이 RDD 내에서 연산하는 데이터 요소들의 타입과 동일해야 한다.
val sum = rdd.reduce((x, y) => x + y)
  • aggregate : 동일한 타입을 되돌려 주어야 한다는 제한에서 자유로울 수 있다.
    • 리턴받는 타입에 맞는 제로 밸류가 필요하다.
    • RDD의 값들을 누적값에 연계해주는 함수가 필요하다.
    • 각 노드에서 자체적으로 값들을 합칠 수 있도록 두 개의 누적값을 합쳐 주는 두번째 함수가 필요하다.
val result = input.aggregate((0, 0))(
	(acc, value) => (acc._1 + value, acc._2 + 1),
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

val average = result._1 / result._2.toDouble
  • collect : RDD의 모든 데이터 요소 리턴
  • take(n) : RDD의 값들 중 num개 리턴
    • 순서대로 값을 되돌려 주지 않는다.
  • foreach(func) : RDD의 각 값에 func을 적용한다.
    • 리턴되는 값이 없다.

RDD 타입 간 변환하기 - 스칼라

  • 어떤 함수들은 특정한 타입의 RDD에서만 쓸 수 있다.
    • 수치형 RDD의 mean(), 키/값 페어 RDD의 join()
  • 스칼라에서 특정 함수를 갖고 있는 RDD로 변환하는 것은 묵시적 변환에 의해 자동으로 동작한다.
  • 묵시적 변환을 위해서는 import org.apache.spark.SparkContext._ 라인이 필요하다.
  • 묵시적 변환은 mean()이나 variance() 같은 것들을 쓸 수 있도록 RDD를 DoubleRDDFunections나 PairRDDFunctions 같은 포장 클래스로 변환한다.

RDD에서 mean() 같은 함수를 호출 하려면 RDD 클래스의 스칼라 문서를 보고 mean()이 없는지 확인하는 것이 좋다.

영속화 (캐싱)

  • RDD에 여러번 액션을 사용하면, 호출하는 액션들에 대한 모든 의존성을 재연산한다.
  • RDD 영속화에 대한 요청을 하면 RDD를 계산한 노드들은 그 파티션들을 저장하고 있게 된다.
  • 영속화된 데이터를 갖고 있는 노드에 장애가 생기면 스파크는 필요 시 유실된 데이터 파티션을 재연산한다.
  • 자바와 스칼라에서는 기본적으로 persist()가 데이터를 JVM 힙에 직렬화되지 않은 객체 형태로 저장한다.
  • persist()는 연산을 강제로 수행하지 않는다.
  • 메모리에 많은 데이터를 올리려고 시도하면 스파크는 LRU 캐시 정책에 따라 오래된 파티션들을 자동으로 버린다.
    • 디스크와 메모리를 같이 쓰는 영속화 수준에서는 이것들을 디스크에 쓴다. 메모리만 쓰는 정책에서는 다음에 파티션에 접근 할 때 스파크는 그 파티션들을 다시 계산한다.
    • 자주 필요하지 않는 데이터를 자꾸 캐싱한다면 막상 필요한 데이터의 파티션이 메모리에서 내려가 버리고 자주 재계산되는 경우가 발생할 수 있다.
  • unpersist() 메서드는 직접 캐시에서 데이터를 삭제할 수 있다.