카프카, 데이터 플랫폼의 최강자 - 카프카 컨슈머
by Gunju Ko
이 글은 “카프카, 데이터 플랫폼의 최강자” 책 내용을 정리한 글입니다.
만약 저작권 관련 문제가 있다면 “gunjuko92@gmail.com”로 메일을 보내주시면, 바로 삭제하도록 하겠습니다.
카프카 컨슈머
컨슈머의 주요 기능은 특정 파티션을 관리하고 있는 파티션 리더에게 메시지 가져오기 요청을 하는 것이다. 각 요청은 로그의 오프셋을 명시하고 그 위치로부터 로그 메시지를 수신한다. 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있다.
1. 주요 옵션
올드 컨슈머는 컨슈머의 오프셋을 주키퍼의 지노드에 저장하는 방식을 지원하다가 카프카 버전 0.9부터 컨슈머의 오프셋 저장을 주키퍼가 아닌 카프카의 토픽에 저장하는 방식으로 변경했다. (참고로 주키퍼의 지노드에 저장하는 방식은 사라졌다)
- bootstrap.servers : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보를 나타낸다. 클러스터 중 하나의 호스트만 입력해도 되지만 이 방식은 권장되지 않는다. 카프카 클러스터 전체의 서버 목록을 입력하는게 좋다.
- fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 사이즈이다. 만약 지정한 사이즈보다 작은 경우, 요청에 대해 응답을 하지 않고 데이터가 누적될 때까지 기다린다.
- group.id : 컨슈머가 속한 그룹을 식별하는 식별자
- enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋할지 여부
- auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더이상 존재하지 않은 경우에 다음 옵션으로 리셋한다.
- earliest : 가장 초기의 오프셋값으로 설정한다.
- latest : 가장 마지막의 오프셋값으로 설정한다.
- none : 이전 오프셋값을 찾지 못하면 에러를 나타낸다.
- fetch.max.bytes : 한번에 가져올 수 있는 최대 데이터 사이즈
- 브로커 설정의 “message.max.bytes”와 토픽 설정의 “max.message.bytes” 설정과 관련이 있다.
- max.partition.fetch.bytes : 파티션당 한번의 fetch로 가져올 수 있는 최대 데이터 사이즈이다. 컨슈머는 레코드를 배치로 가져오게 된다. 브로커 설정의 message.max.bytes와 토픽 설정의 max.message.bytes 설정과 관련이 있다.
- request.timeout.ms : 요청에 대해 응답을 기다리는 최대 시간
- connections.max.idle.ms : 이 설정보다 더 오랜 기간동안 idle 상태인 커넥션을 종료시킨다.
- session.timeout.ms : 브로커가 컨슈머가 살아있는 것으로 판단하는 시간. 만약 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머 리밸런스를 시도한다. session.timeout.ms는 하트비트 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하며, 이 속성은 heartbeat.interval.ms와 밀접한 관련이 있다.
- 브로커 설정의 “group.min.session.timeout.ms”, “group.max.session.timeout.ms” 사이값으로 해야한다.
- heartbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 하트비트를 보낼 것인지 조정한다. 이 값은 “session.timeout.ms” 보다 작게 설정해야한다. 또한 일반적으로 “session.timeout.ms”의 1/3보다 작게 설정한다.
- max.poll.records : poll()에 대한 최대 레코드 수를 조정한다.
- max.poll.interval.ms : 컨슈머가 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 한다.
- auto.commit.interval.ms : 주기적으로 오프셋을 커밋하는 시간
- fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간
- isolation.level : 트랜잭션과 관련된 레코드를 어떻게 읽을지를 결정한다.
- read_committed : 커밋된 레코드만 읽는다.
- read_uncommitted : 아직 커밋되지 않은 레코드도 읽는다. 심지어 트랜잭션이 abort된 레코드도 읽는다. (즉 모든 레코드를 읽는다)
- 트랜잭션과 무관한 레코드는 isolation.level과 상관없이 모든 경우에 읽을수 있다.
컨슈머는 레코드를 오프셋 순서대로 읽는다.
read_commited 모드에서는 LSO까지의 레코드만 읽을 수 있다. LSO는 트랜잭션이 진행중인 레코드 중 맨 앞에 있는 레코드의 오프셋에서 1을 뺀 값이다. 따라서 트랜잭션이 진행중인 레코드 뒤에 있는 레코드들은 관련 트랜잭션이 완료될 때까지는 읽을 수 없다. 또한 read_commited 컨슈머에서 seekToEnd 메소드를 호출하면 오프셋을 LSO로 옮긴다.
- partition.assignment.strategy : 파티션 리밸런싱이 일어나면, 컨슈머 그룹 리더가 각 컨슈머에게 파티션을 할당한다. 그 때 파티션 할당 전략과 관련된 클래스를 지정한다. 기본값은 RangeAssignor 클래스이다.
2. 파티션과 메시지 순서
- 컨슈머는 오직 파티션의 오프셋 기준으로만 메시지를 가져온다.
- 카프카 컨슈머에서의 메시지 순서는 동일한 파티션 내에서는 프로듀서가 생성한 순서와 동일하게 처리하지만, 파티션과 파티션 사이에서는 순서를 보장하지 않는다.
순서를 보장하는 방법
- 파티션 1개로 토픽을 구성한다. 단 파티션이 한개인 경우에는 처리량이 떨어지는 부분은 감안해야 한다.
- 순서를 보장해야하는 메시지는 동일한 키를 가지도록 한다. 메시지의 키가 같은 경우 같은 파티션으로 전송이 되기 때문에, 같은 키를 가지는 메시지의 경우에는 순서가 보장된다.
3. 컨슈머 그룹
- 컨슈머 그룹은 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져올 수 있다. 최근에는 하나의 데이터를 다양한 용도로 사용하는 요구가 많아졌기 때문에 이러한 특징은 큰 장점이다.
- 하나 이상의 컨슈머를 같은 그룹에 속하게 하려면 동일한 group.id로 설정하면 된다.
- 파티션의 소유권이 이동하는 것을 리밸런스라고 한다. 컨슈머 그룹의 리밸런스를 통해 컨슈머 그룹에 컨슈머를 쉽고 안전하게 추가하고 제거할 수 있다. 또한 높은 가용성과 확장성을 확보할 수 있다.
- 리밸런스를 하는 동안 일시적으로 컨슈머는 메시지를 가져올 수 없다. 그래서 리밸런스가 발생하면 컨슈머 그룹 전체가 일시적으로 사용할 수 없는 단점이 있다.
- 토픽의 파티션에는 하나의 컨슈머만 연결할 수 있다. 서로다른 컨슈머가 하나의 파티션으로부터 메시지를 가져오는것은 불가능하다.
- 컨슈머 그룹마다 각자의 오프셋을 별로도 관리한다.
- group.id는 중복이 되지 않도록 주의해야 한다.
4. 커밋과 오프셋
- 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보를 기록하고 있다. 각 파티션에 대해 현재 위치를 업데이트하는 동작을 커밋한다고 한다.
- 카프카 내에 별도로 내부에서 사용하는 토픽을 만들고 그 토픽에 오프셋 정보를 저장하고 있다. (__consumer_offset)
- 리밸런스가 일어난 후 각각의 컨슈머는 이전에 처리했던 토픽의 파티션이 아닌 다른 새로운 파티션에 할당될 수 있다. 컨슈머는 새로운 파티션에 대해 가장 최근 커밋한 오프셋을 읽고 그 이후부터 메시지들을 가져오기 시작한다.
- 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 작으면 메시지 처리는 중복된다.
- 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 크면 메세지 처리가 누락된다.
4.1 자동 커밋
- enable.auto.commit을 true로 설정하면 5초마다 컨슈머는 poll()를 호출할 때 가장 마지막 오프셋을 커밋한다.
- 5초는 기본값이며 auto.commit.interval.ms 옵션을 통해 조정이 가능하다.
- 컨슈머는 poll을 요청할 때마다 커밋할 시간인지 아닌지 체크하게 되고, poll 요청으로 가져온 마지막 오프셋을 커밋한다.
- 자동 커밋을 사용하는 경우, 리밸런싱이 일어나면서 메시지가 중복되어 처리될 수도 있다. 중복을 줄이기 위해 auto.commit.interval.ms 값을 작게 할 수도 있지만 완벽하게 중복을 제거할 순 없다.
4.2 수동 커밋
- 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안 되는 경우에 사용한다.
- 커밋을 수동으로 해야하는 경우 enable.auto.commit을 false로 지정한다.
- commitSync 혹은 commitAsync 메소드를 호출해 수동으로 커밋한다.
4.3 특정 파티션 할당
- assign 메소드를 통해 특정 파티션의 메시지를 가져올 수 있다.
수동으로 파티션을 할당하는 경우, 컨슈머 인스턴스마다 컨슈머 그룹 아이디를 서로 다르게 설정해야 한다.
4.4 특정 오프셋부터 메시지 가져오기
- seek() 메소드를 통해 특정 오프셋부터 메시지를 가져올 수 있다.