이 글은 “카프카, 데이터 플랫폼의 최강자” 책 내용을 정리한 글입니다.

만약 저작권 관련 문제가 있다면 “gunjuko92@gmail.com”로 메일을 보내주시면, 바로 삭제하도록 하겠습니다.

4장 카프카 프로듀서

프로듀서

비동기 전송

프로듀서에서 서버로 메시지를 보내고 난 후에 성공적으로 도착했는지까지 확인하지 않는다. 카프카가 항상 살아있는 상태이고 프로듀서가 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만, 일부 메시지는 손실될 수도 있다.

  • send() : 메소드를 이용해서 ProducerRecord를 전송한다. 메시지는 버퍼에 저장되고 별도의 스레드를 통해 브로커로 전송하나. send()의 리턴값은 Future<RecordMetadata> 객체이다.
  • send() 메소드를 호출할 때 파라미터로 Callback 타입의 객체를 넘겨줄 수 있다. Callback의 onCompletion 메소드는 레코드 전송 후에 호출된다.

동기 전송

  • send() 메소드의 리턴값인 Future 객체의 get() 메소드를 호출하면 된다.
  • get() 메소드를 이용해 카프카의 응답을 기다린다. 메시지가 성공적으로 전송되지 않으면 예외가 발생하고, 에러가 없다면 메시지가 기록된 오프셋을 알 수 있는 RecordMetadata를 얻게 된다. RecordMetadata를 이용해 파티션과 오프셋 정보를 출력한다.

운영 중이 환경에 맞게 동기, 비동기 방식을 선택해 사용하면 된다.

프로듀서 주요옵션

  • bootstrap.servers : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보. 카프카 전체 리스트가 아닌 호스트 하나만 입력해 사용할 수 있다. 하지만 리스트 전체을 입력하는 것을 권장한다. 만약 주어진 리스트의 서버 중 하나에서 장애가 발생할 경우 클라이언트는 자동으로 다른 서버로 재접속을 시도하기 때문에 사용자 프로그램에서는 문제 없이 사용할 수 있게 된다.
  • acks (default : 1) : 프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack의 수.
    • acks = 0 : 프로듀서는 서버로부터 어떠한 ack도 기다리지 않는다. 이 경우 서버가 데이터를 받았는지 보장하지 않고, 클라이언트는 전송 실패에 대한 결과를 알지 못하기 때문에 재시도 설정도 적용되지 않는다. 메시지 손실 가능성은 높지만 성능이 매우 좋다. 각 레코드에 대해 다시 주어진 오프셋은 항상 -1로 설정된다.
    • acks = 1 : 리더로부터 ack를 받지만, 팔로워는 확인하지 않는다. 이 경우 일부 데이터의 손실이 발생할 수도 있다.
    • acks = all : 만약 all 또는 -1로 설정하는 경우 리더는 ISR의 팔로워로부터 데이터에 대한 ack를 기다린다. 데이터 무손실에 대해 가장 강력하게 보장한다.
  • max.block.ms (default : 60000) : KafkaProducer.send()와 KafkaProducer.partitionsFor()가 최대로 block될 수 있는 시간을 결정한다. 이 메소드들은 버퍼가 가득찼거나 메타데이터를 사용할 수 없는 경우에 block될 수 있다.
  • buffer.memory (default : 33554432) : producer가 전송 대기중인 레코드를 저장하는 버퍼의 메모리 바이트 수
  • compression.type (default : none) : 프로듀서가 데이터를 압축해서 보낼수 있는데 어떤 타입으로 압축할지를 정할 수 있다.
    • none, gzip, snappy, lz4 같은 다양한 포맷 중 하나를 선택
  • retries (default : 0) : 재시도 횟수
    • max.inflight.requests.per.connection 설정을 1로 하지 않으면 재시도로 인해 레코드의 순서가 뒤바뀔수 있다.
  • batch.size (default : 16384) : 프로듀서는 같은 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도한다. 이 설정으로 배치 크기 바이트 단위를 조정할 수 있다. 정의된 크기보다 큰 데이터는 배치를 시도하지 않게 된다. 배치를 보내기 전 클라이언트 장애가 발생하면 배치 내에 있던 메시지는 전달되지 않는다.
  • linger.ms (default : 0) : 배치형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간을 조정한다. 카프카 프로듀서는 지정된 배치 사이즈에 도달하면 이 옵션과 관계없이 즉시 메시지를 전송하고, 배치사이즈에 도달하지 못한 상황에서 linger.ms 제한시간에 도달했을때 메시지들을 전송한다. 0이 기본값이며, 0보다 큰 값을 설정하면 지연 시간은 조금 발생하지만 처리량은 좋아진다.
  • max.request.size (default : 1048576) : 프로듀서가 보낼수 있는 최대 요청 바이트 사이즈. 이 값은 batch.size의 상한이다. 참고로 여러개의 배치가 한번의 요청으로 전송될 수 있다.
  • request.timeout.ms (default : 30000) : 클라이언트가 요청 응답을 기다리는 최대 시간.
  • max.inflight.requests.per.connection (default : 5) : 이 값을 1보다 크게 설정하는 경우 재시도로 인한 메시지 순서가 변경될 위험이 있다. 이 설정은 응답을 받지 못한 요청의 최대 개수를 제한한다.

메시지 전송 방법

프로듀서의 옵션 중 acks 옵션을 어떻게 설정하는지에 따라 카프카로 메시지를 전송할 때 메시지 손실 여부와 메시지 전송 속도 및 처리량 등이 달라지게 된다.

메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우

  • acks = 0 : 프로듀서는 카프카 서버에서 응답을 기다리지 않고, 메시지를 보낼 준비가 되는 즉시 다음 요청을 보낸다. 매우 빠르게 메시지를 보낼 수 있지만 메시지가 손실될 수 있다.
  • 일반적인 운영환경의 경우 메시지 손실 없이 빠르게 보내지만, 브로커가 다운되는 장애 등의 경우에 메시지 손실 가능성이 높은 편이라고 이해하면 된다.

메시지 손실 가능성이 적고 적당한 전송이 필요한 경우

  • acks=1 : 프로듀서가 카프카로 메시지를 보낸 후 메시지에 대해 카프카가 잘 받았는지 확인한다. 메시지를 보내는 속도는 약간 떨어지게 된다.
  • 리더에 장애가 발생하는 경우, 메시지 손실이 발생할 수 있다. 아직 리더에서 팔로워로 복제되지 않은 메시지가 존재하는 경우, 해당 메시지들은 손실된다. 즉 팔로워들은 리더로부터 장애 발생 직전의 메시지는 가져오지 못했지만 그 상태 그대로 리더가 되는 경우 일부 메시지는 손실된다.
  • 운영환경에서도 프로듀서 옵션으로 acks=1를 사용하고 리더 선출 작업 등이 일어났다고 해서 무조건 메시지 손실이 발생하는 것이 아니다. 아주 예외적이 경우에 메시지 손실이 발생할 가능성이 있다라고 이해하면 된다.

특별한 경우가 아니라면 속도와 안전성을 확보할 수 있는 acks=1로 사용하는 방법을 추천한다.

전송 속도는 느리지만 메시지 손실이 없어야 하는 경우

  • acks=all : 프로듀서가 메시지를 전송하고 난 후 리더가 메시지를 받았는지 확인하고 추가적으로 팔로워까지 메시지를 받았는지 확인하는 것이다.
  • 프로듀서 설정뿐만 아니라 브로커 설정도 같이 수정해줘야 한다.
    • min.insync.replica : min.insync.replica를 1로 설정한다면 acks=all로 설정하더라도 acks=1 처럼 동작한다. 카프카에서는 프로듀서만 acks=all로 메시지를 보낸다고 해서 손실 없는 메시지를 보장해주는 것이 아니기 때문에 옵션을 잘 이해하고 설정해야 한다.
    • min.insync.replica를 2로 설정하면 acks를 보내기 전에 최소 2개의 리플리케이션을 유지하는지 확인한다. 결과적으로 리더와 팔로워중 하나에 메시지가 저장되고 난 후에 ack를 보낸다.

아파치 카프카 문서에는 손실 없는 메시지 전송을 위한 조건으로 프로듀서는 acks=all, 브로커의 min.insync.replica의 옵션은 2, 토픽의 리플리케이션 팩터는 3으로 권장하고 있다.

  • 여기서 min.insync.replica을 3으로 설정하게 되면 브로커 하나만 다운되더라도 메시지를 보낼 수 없는 클러스터 전체 장애와 비슷한 상황이 발생하게 된다. (acks=all인 경우)