스파크 완벽 가이드 - Chapter 8. 조인

이 글은 “스파크 완벽 가이드” 책 내용을 정리한 글입니다.

저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.

1. 조인 표현식

  • 동등 조인 : 왼쪽과 오른쪽 데이터 셋에 지정된 키가 동일하면 데이터를 결합
  • 복합 데이터 타입을 조인에 사용할 수도 있음

2. 조인 타입

  • 내부 조인 : 왼쪽과 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 외부 조인 : 왼쪽이나 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 왼쪽 외부 조인 : 왼쪽 데이터셋에 키가 있는 로우를 유지
  • 오른쪽 외부 조인 : 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 왼쪽 세미 조인 : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에는 키가 일치하는 왼쪽 데이터셋만 유지
  • 왼쪽 안티 조인 : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않는 왼쪽 데이터셋만 유지
  • 자연 조인 : 두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적으로 결합하는 조인
  • 교차 조인, 카테시안 조인 : 왼쪽 데이터셋의 모든 로우와 오른쪽 데이터 셋의 모든 로우를 조합

3. 내부조인

  • 내부 조인은 DataFrame이나 테이블에 존재하는 키를 평가한다. 그리고 true인 로우만 결합한다.
val joinExpression = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpression)
  • join 메서드에 세 번째 파라미터(joinType)로 조인 타입을 명확하게 지정할수도 있다.

4. 외부 조인

  • 외부 조인은 DataFrame이나 테이블에 존재하는 키를 평가하여 true 혹은 false인 로우를 포함한다.
  • 왼쪽이나 오른쪽 DataFrame에 일치하는 로우가 없다면 해당 위치에 null을 삽인한다.
person.join(graduateProgram, joinExpression, "outer")

5. 왼쪽 외부 조인

person.join(graduateProgram, joinExpression, "left_outer")

6. 오른쪽 외부 조인

person.join(graduateProgram, joinExpression, "right_outer")

7. 왼쪽 세미 조인

  • 세미 조인은 오른쪽 DataFrame은 값이 존재하는지 확인하기 위해 값만 비교하는 용도로 사용
  • 왼쪽 세미 조인은 기존 조인과 달리 DataFrame의 필터 정도로 볼 수 있다.
graduateProgram.join(person, joinExpression, "left_semi")

8. 왼쪽 안티 조인

  • 왼쪽 세미 조인의 반대 개념
  • SQL의 NOT IN과 같은 스타일의 필터로 보면 된다.
graduateProgram.join(person, joinExpression, "left_anti")

9. 자연 조인

  • 자연 조인은 조인하려는 칼럼을 암시적으로 추정 (일치하는 컬럼을 찾는다.)
  • 왼쪽, 오른쪽, 외부 자연 조인 사용 가능
  • 암시적인 처리는 매우 위험하기 때문에 사용하지 않는게 좋음

10. 교차 조인 (카테시안 조인)

  • 조건절을 기술하지 않는 내부 조인
  • 왼쪽 DataFrame의 모든 로우를 오른쪽 DataFrame의 모든 로우와 결합
  • 엄청난 수의 로우를 가진 DataFrame이 생성될 수 있음
graduateProgram.join(person, joinExpression, "cross")

person.crossJoin(graduateProgram)
  • 정말로 필요한 경우에만 교차 조인을 자용해야 한다. (교차 조인은 매우 위험함)
  • spark.sql.crossJoin.enable 속성값을 true로 설정해 교차 조인 시 발생하는 경고 로그를 제거하거나 스파크가 교차 조인을 다른 조인 방식으로 처리하지 않도록 만들 수 있다.

11. 조인 사용 시 문제점

11.1 복합 데이터 타입의 조인

  • 불리언을 반환하는 모든 표현식은 조인 표현식으로 간주할 수 있음
person.withColumnRenamed("id", "personId")
	.join(sparkStatus, expr("array_contains(spark_status, id)"))
SELECT * FROM
 (select id as personId, name, graduate_program, spark_status FROM person)
INNER JOIN sparkStatus ON array_contains(spark_status, id)

11.2 중복 컬럼명 처리

  • 중복이 발생하는 경우
    • 조인에 사용할 DataFrame의 특정 키가 동일한 이름을 가지며, 키가 제거되지 않도록 조인 표현식에 명시하는 경우
    • 조인 대상이 아닌 2개의 컬럼이 동일한 이름을 가진 경우
  • 중복된 컬럼 중 하나를 참조하면 에러가 발생함
해결 방법 1 : 다른 조인 표현식 사용
  • 동일한 이름을 가진 두 개의 키를 사용한다면 불리언 형태의 조인 표현식을 문자열이나 시퀀스 형태로 바꾼다.
  • 조인을 할 때 두 컬럼 중 하나가 자동으로 제거된다.
val gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
person.jon(gradProgramDupe, "graduate_program").select("graduate_program")
해결 방법 2 : 조인 후 컬럼 제거
  • 조인 후 문제가 되는 컬럼을 제거
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr).drop(graduateProgram.col("id"))
해결 방법 3 : 조인 전 컬럼명 변경
  • 조인 전에 컬럼명을 변경
val gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
val joinExpr = person.col("graduate_program") === graduateProgram.col("grad_id")
person.join(graduateProgram, joinExpr)

12 스파크의 조인 수행 방식

  • 조인 실행에 필요한 두 가지 핵심 전략은 “노드간 네트워크 통신 전략”, “노드별 연산 전략” 이다.

12.1 네트워크 통신 전략

  • 스파크는 조인 시 두 가지 클러스터 통신 방식을 활용한다. 셔플 조인과 브로드캐스트 조인이다.
  • 셔플 조인
    • 전체 노드간 통신이 발생
    • 조인에 사용한 특정 키나 키 집합을 어떤 노드가 가졌는지에 따라 해당 노드와 데이터를 공유
    • 네트워크는 복잡해지고 많은 자원을 사용
    • 조인 프로세스가 진행되는 동안 모든 워커 노드 (그리고 모든 파티션)에서 통신이 발생함
  • 브로드캐스트 조인
    • 작은 DataFrame을 클러스터의 전체 워커 노드에 복제하는것을 의미
    • 조인 프로세스 내내 전체 노드가 통신하는 현상을 방지할 수 있음
    • 시작 시 단 한 번만 복제가 수행되며 그 이후로는 개별 워커가 다른 워커 노드를 기다리거나 통신할 필요 없이 작업을 수행할 수 있음
    • 모든 단일 노드에서 개별적으로 조인이 수행되므로 CPU가 가장 큰 병목 구간이 됨
큰 테이블과 큰 테이블 조인
  • 큰 테이블간의 조인은 셔플 조인이 발생
큰 테이블과 작은 테이블 조인
  • 테이블이 단일 워커 노드의 메모리 크기에 적합할 정도로 충분히 작은 경우 조인 연산을 최적화 할 수 있음
  • 브로드 캐스트 조인이 훨씬 효율적
  • DataFrame API를 사용하면 옵티마이저에게 브로드캐스트 조인을 사용하도록 힌트를 줄 수 있음
val joinExpression = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpression)
  • SQL 역시 조인 수행에 필요한 힌트를 줄 수 있으나 강제성이 없으므로 옵티마이저가 이를 무시할 수도 있음
SELECT /*+ MAPJOIN(graduateProgram) */ * FROM person JOIN graduateProgram
ON person.graduate_program = graduateProgram.id
아주 작은 테이블 사이의 조인
  • 스파크가 조인 방식을 결정하도록 내버려두는 것이 가장 좋음
  • 필요한 경우 브로드캐스트 조인을 강제할 수 있음

조인 전에 데이터를 적절하게 분할하면 셔플이 계획되어 있더라도 동일한 머신에 두 DataFrame의 데이터가 있을 수 있다. 따라서 셔플을 피할 수 있고 훨씬 더 효율적으로 실행할 수 있다.