Chapter2 - 맵리듀스

이 글은 “하둡 완벽 가이드” 책 내용을 정리한 글입니다.

저작권에 문제가 있는 경우 “gunjuko92@gmail.com”으로 연락주시면 감사하겠습니다.

1. 맵리듀스란?

  • 데이터 처리를 위한 프로그래밍 모델
  • 하둡은 다양한 언어로 작성된 맵리듀스 프로그램을 구동시킬 수 있음
  • 병행성을 고려해서 설계되었음
  • 하둡의 특성상 소수의 큰 파일이 처리하기 쉽고 효율적이다.

2. 병렬처리의 어려움

  • 일을 동일한 크기로 나눈다는 것이 언제나 쉽고 명확하지 않음
  • 독립적인 프로세스의 결과를 모두 합치는데 많은 처리가 필요할 수도 있다.
  • 단일 머신의 처리 능력은 한계가 있다. 여러 대의 머신을 사용할 때는 코디네이션과 신뢰성의 범주에 속하는 요쇼를 추가로 고려해야 한다. 잡의 전체 과정을 누가 조율하고 프로세스의 실패를 어떻게 처리할지 고민해야 한다.

하둡과 같은 프레임워크의 도움을 받으면 병렬처리를 쉽게 할 수 있다.

3. 하둡으로 데이터 분석하기

맵과 리듀스

  • 맵리듀스 작업은 크게 맵 단계와 리듀스 단계로 구분된다.
  • 각 단계는 입력과 출력으로 키-값의 상을 가진다. 그 타입은 개발자가 선택한다.
  • 맵 단계의 입력은 원본 데이터이다.
  • 맵 함수의 출력이 리듀스 함수의 입력으로 보내진다. 이 과정은 맵리듀스 프레임워크에 의해 처리된다. 이 과정에서 키-값 쌍은 키를 기준으로 정렬되고 그룹화된다.
  • 맵 함수를 구현하고 싶으면, Mapper 인터페이스를 구현하면 돈다. Mapper 클래스는 제네릭 타입으로, 네 개의 정규 타입 매개변수(입력키, 입력값, 출력키, 출력값)를 가진다.
    • 하둡은 자체적으로 기본 타입 셋을 제공한다. LongWritable, Text, IntWritable 등을 제공한다.
  • Mapper#map 메소드는 출력을 위해 Context의 인스턴스를 제공한다.
  • 리듀스 함수를 구현하고 싶으면 Reduce 인터페이스를 구현하면 된다. 리듀스 함수 역시 입력과 출력 타입을 규정하기 위해 네 개의 정규 타입 매개변수를 사용한다. 리듀스 함수의 입력 타입은 맵 함수의 출력 타입과 짝을 이룬다.
  • Job 객체는 잡 명세서를 작성한다. 하둡 클러스터에서 잡을 실행할 떄는 먼저 코드를 JAR 파일로 묶어야한다. 하둡 클러스터의 해당 머신에 JAR 파일을 배포한다. JAR 파일의 이름을 명시적으로 지정하는 방법도 있지만, Job의 setJarByClass() 메소드를 이용하여 클래스를 하나 지정하면 하둡은 해당 클래스를 포함한 관련 JAR 파일을 찾아서 클러스터에 배치해준다.
  • Job 객체를 생성할 때 입력과 출력 경로를 지정한다.
  • Job#SetMapperClass, Job#SetReducerClass 메소드를 통해 매퍼와 리듀서를 지정할 수 있다.
  • Job#setOutputKeyClass(), Job#setOutputValueClass(), Job#setMapOutputKeyClass(), Job#setMapOutputValueClass() 메서드를 통해서 매퍼와 리듀서의 입력, 출력 타입을 지정할 수 있다.

테스트 수행

  • hadoop 명령을 쉘에서 호출하면 첫번째 인수로 지정한 클래스를 실행하기 위해 먼저 JVM이 구동된다. hadoop 명령은 의존성이 있는 모든 하둡 라이브러리를 클래스경로에 추가하고 하둡 환경 설정을 불러온다.

4. 분산형으로 확장하기

  • 분상형으로 확장하고 싶으면 전체 데이터를 HDFS라는 분산 파일 시스템에 저장할 필요가 있다.
  • 하둡은 데이터의 일부분이 저장된 클러스터의 각 머신에서 맵리듀스 프로그램을 실행한다. 이를 위해 하둡은 YARN이라 불리는 하둡 자원 관리 시스템을 이용한다.

데이터의 흐름

  • 잡 : 클라이언트가 수행하는 작업의 기본 단위
    • 입력 데이터, 맵리듀스 프로그램, 설정 정보로 구성된다.
  • 하둡은 잡을 맵 태스크와 리듀스 태스크로 나누어 실행한다.각 태스크는 YARN을 이용하여 스케줄링되고 클러스터의 여러 노드에서 실행된다. 특정 노드의 태스크 하나가 실패하면 자동으로 다른 노드를 재할당하여 다시 실행된다.
  • 하둡은 맵리듀스의 잡의 입력을 입력 스플릿 또는 단순히 스플릿이라고 부르는 고정 크기 조각으로 분리한다. 하둡은 각 스플릿마다 하나의 맵 태스크를 생성한다. 스플릿의 크기가 작을수록 부하 분산에 더 좋은 효과를 볼 수 있다. 반면 스플릿 크기가 너무 작으면 스플릿 관리와 맵 태스크 생성을 위한 오버헤드 때문에 잡의 실행 시간이 증가하는 단점이 있다. 일반적인 잡의 적절한 스플릿 크기는 HDFS 블록의 기본 크기인 128MB가 적당하다고 알려져 있다.
  • 하둡은 HDFS 내의 입력 데이터가 있는 노드에서 맵 태스크를 실행할 때 가장 빠르게 작동한다. 이를 데이터 지역성 최적화라고 하는데, 클러스터의 중요한 공유 자원인 네트워크 대역폭을 사용하지 않는 방법이다. 그러나 맵 태스크의 입력 스플릿에 해당하는 HDFS 블록 복제본이 저장된 노드 모두 다른 맵 태스크를 실행하여 여부가 없는 상황에는 다른 노드에서 가용한 맵 슬롯을 찾는다.
  • 의 입력 스플릿에 해당하는 HDFS 블록 복제본이 저장된 노드 모두 다른 맵 태스크를 실행하여 여유가 없는 상황에서는 다른 노드에서 가용한 맵 슬롯을 찾는다.
  • 최적의 스플릿 크기가 HDFS 블록 크기와 같아야하는 이유는 그 블록 크기가 단일 노드에 저장된다고 확신할 수 있는 가장 큰 입력 크기이기 때문이다. 하나의 스플릿이 두 블록에 걸쳐 있을때 두 블록 모두 저장하는 HDFS 노드는 존재할 가능성이 낮기 때문에 스플릿의 일부 데이터를 네트워크를 통해 맵 태스크가 실행되는 다른 노드로 전송해야 한다.
  • 맵 태스크의 결과는 중간 결과물이고, HDFS가 아닌 로컬 디스크에 저장된다.
  • 리듀스 태스크는 일반적으로 모든 매퍼의 출력 결과를 입력으로 받기 때문에 데이터 지역성의 장점이 없다.
  • 맵의 결과는 네트워크를 통해 일단 리듀서 태스크가 실행 중인 노드로 전송되고, 맵의 결과를 병합한 후 사용자 정의 리듀서 함수로 전달된다. 일반적으로 리듀서 결과는 HDFS에 저장된다. 리듀서 출력에 대한 HDFS 블록의 첫번째 복제본은 로컬 노드에 저장되고, 나머지 복제본은 외부 랙에 저장된다.
  • 리듀스 태스크 수는 입력 크기와 상관없이 독립적으로 지정할 수 있다.
    • 리듀스 수를 선택하는 것은 잡의 실행 시간에 미치는 영향이 매우 크다.
  • 리듀스가 여럿이면 맵 태스크는 리듀스 수만큼 파티션을 생성하고 맵의 결과를 각 파티션에 분배한다. 각 파티션에는 여러 키가 존재하지만 개별 키에 속한 모든 레코드는 여러 파티션 중 한곳에만 배치된다.
    • 파티셔닝 알고리즘은 사용자가 직접 정의할수도 있지만 해시 함수로 키를 분배하는 기본 파티셔너를 주로 사용하면 매우 잘 동작한다.
  • 맵과 리듀스 태스크 사이의 데이터 흐름을 셔플이라고 부른다.

컴바이너 함수

  • 네트워크 대역폭은 한계가 있기 때문에 맵리듀스 태스크 사이의 데이터 전송을 최조화할 필요가 있다.
  • 하둡은 맵의 결과를 처리하는 컴바이너 함수를 허용한다. 컴바이너 함수는 최적화와 관련이 있다.
  • 컴바이너 함수에 적용할 수 있는 함수의 형태에는 제약이 있다. 리듀서 함수가 commutative 및 associative한 속성을 지녀야한다. 컴바이너 함수를 적용할 수 있는 대표적인 예는 최대값을 구하는 것이고, 적용할 수 없는 대표적인 예는 평균을 구하는 것이다.
  • 컴바이너를 사용하면 매퍼와 리듀서 사이의 셔플 단계에서 전송되는 데이터양을 줄이는데 큰 도움이 된다.
  • 컴바이너 함수는 Reducer 클래스를 사용해서 정의한다.
    • Job#setCombinerClass 메서드를 사용해서 컴바이너 함수를 지정한다.