이 글은 “카프카, 데이터 플랫폼의 최강자” 책 내용을 정리한 글입니다.

만약 저작권 관련 문제가 있다면 “gunjuko92@gmail.com”로 메일을 보내주시면, 바로 삭제하도록 하겠습니다.

카프카를 활용한 데이터 파이프라인 구축

1. 카프카를 활용한 데이터 흐름도

  • 프로듀서 : 메시지를 생성해낸 다음 그 메시지를 전송하는 역할
  • 카프카 : 프로듀서가 생성한 메시지를 저장하는 중간 큐 역할
  • 컨슈머 : 카프카에 저장된 메시지를 가져오는 역할. 그 외에도 데이터 처리, 로그 분석을 위해 다른 애플리케이션으로 메시지를 존송하는 역할을 하기도 함

데이터 처리에 대한 기업 내 요구 사항이 많아지면서 데이터 수집, 저장, 분석에 이르는 일련의 과정을 관리하는 솔루션이 출시되고 있다. 대표적인 솔루션으로는 아파치 나이파이를 꼽을 수 있다.

  • Nifi : 데이터 흐름을 정의하고, 정의된 흐름대로 자동으로 실행해주는 유용한 애플리케이션이다.

2. 파일 비트를 이용한 메시지 전송

  • 카프카에서는 설치 경로의 하위 디렉토리인 logs 디렉토리에 server.log라는 파일에 카프카와 관련된 로그들이 저장된다. 카프카에서 생성되는 로그를 카프카의 토픽으로 전송할 수도 있다.
  • 카프카의 로그를 카프카의 토픽으로 전송하려면 프로듀서가 필요하다. 프로듀서의 경우 카프카 클라이언트 라이브러리를 이용하여 프로그램을 직접 구현할 수도 있고, 오픈소스 애플리케이션을 이용할 수 있다. 예를 들어 파일비트를 이용하면, 특정 파일을 토픽으로 전송할 수 있다.
  • 파일피트는 앨라스틱에서 제공하고 있는 경량 데이터 수집기이다. 파일비트의 설정 파일을 수정해서 실행시키면, server.log 파일의 데이터를 특정 토픽으로 전송할 수 있다.

실습에 관련된 자세한 내용은 책을 참고하시길 바랍니다.

3. 나이파이를 이용해 메시지 가져오기

  • 데이터 처리를 위해서는 컨슈머를 이용해 카프카 토픽으로부터 메시지를 가져와야한다.
  • 컨슈머는 프로그래밍 언어등을 이용해 직접 만들 수도 있고, 오픈소스를 활용할 수도 있다. 대표적으로는 나이파이가 있다.
  • 나이파이 애플리케이션은 카프카와 동일하게 클러스터 구성이 가능한 분산 애플리케이션이다.
  • 프로세스는 나이파이에서 데이터 처리를 위한 각각의 컴포넌트를 말하며, AMQP 컨슘하기, 파일로 저장하기, 전송하기 등 여러 가지 작업을 할 수 있다.
  • 카프카 컨슈머를 만들고 싶다면, ConsumeKafka 프로세스를 추가하면 된다. 그리고 브로커 정보, 토픽 이름, 그룹 ID 등 컨슈머와 관련된 설정을 변경하면 된다.

4. 실시간 분석을 위해 엘라스틱서치에 메시지 저장

  • 엘라스틱서치는 엘라스틱사의 분산형 RESTful 검색 및 분석 엔진으로서, 전문 검색 질의를 이용해 원하는 데이터 분석을 빠르게 할 수 있는 애플리케이션이다.
  • 나아파이의 프로세스를 추가해 엘라스틱서치로 데이터를 전송할 수 있다. PutElasticsearchHttp 프로세서는 데이터를 엘라스틱서치로 넣어주는 역할을 한다. 프로세서를 추가한 뒤에 설정을 알맞게 변경해주면 된다.
  • 카프카의 토픽을 엘라스틱서치로 전송하기 위해서 데이터 처리의 순서는 카프카 컨슈머 > 엘라스틱서치 순으로 되어야한다. 이를 위해 나이파이에서 2개의 프로세스를 연결한다.

5. 키바나를 이용해 엘라스틱서치에 저장된 데이터 확인

  • 엘라스틱서치에 저장된 데이터를 확인하려면 엘라스틱서치 API를 사용하는 방법도 있겠지만 키바나를 이용할 수도 있다. 키바나는 엘라스틱에서 제공하는 애플리케이션 중 하나로서 엘라스틱서치에 저장된 데이터 확인과 분석을 쉽게할 수 있는 애플리케이션이다.
  • 각 브로커에서 수집되는 로그가 특정 토픽에 있기 때문에 엘라스틱서치에 전송하는 것 외에 추가로 하둡이나 로컬에 저장하길 원한다면, 나이파이의 PutHDFS 프로세서 등을 추가해 쉽게 하둡 등의 저장소에 저장하거나 다른 용도로 활용할 수 있다.

데이터가 카프카에 저장되어 있다면 용도에 따라 다른 저장소에 저장하여 분석할 수 있다.

6. 현재의 토픽을 이용해 새로운 토픽으로 메시지 재생산

  • 메시지 양의 굉장히 많은 토픽의 내용 중 필요한 메시지만 꺼내서 다시 새로운 토픽으로 메시지를 보내는 경우가 많다. 예를 들어 로그 데이터를 저장한 토픽에서 에러 메시지와 관련된 메시지만 에러 토픽으로 보낼 수 있다.
  • 나이파이에서 프로세스를 통해 이러한 작업을 하고 싶다면, EvaluateJsonPath, RouteOnAttribute를 사용해보길 바란다.
    • EvaluateJsonPath : 하나 또는 여러 JsonPath에서 플로 파일 속성에 추가할 수 있거나 플로 파일 자체로 기록할 수 있따.
    • RouteOnAttribute : 속성을 이용해 라우팅하는 역할을 한다.
  • 카프카의 토픽으로 메시지들이 저장되면 저장된 토픽을 이용해 확장 시스템과 연동한다든지, 토픽의 내용을 분리하는 등의 많은 작업이 가능하다. 또 이러한 작업을 진행함에 있어 로그를 생산하는 쪽에서는 별도의 추가 작업 없이 가능하다는 부분도 매우 큰 장점이라고 할 수 있다.