DeserializationExceptionHandler

DeserializationExceptionHandler를 사용하면 레코드를 deserialize할 때 발생하는 예외를 처리할 수 있다. DeserializationExceptionHandler 인터페이스의 구현체는 레코드와 발생한 예외를 참고해서 FAIL 혹은 CONTINUE를 리턴해야 한다.

  • FAIL을 리턴하는 경우 : Streams이 shutdown 되어야하는 경우
  • CONTINUE을 리턴하는 경우 : Streams이 예외를 무시하고 처리를 계속해야하는 경우

기본 구현 클래스는 LogAndFailExceptionHandler이다. 그리고 Kafka Streams는 아래와 같은 ExceptionHandler를 제공한다.

  • LogAndContinueExaceptionHandler : deserialization 예외를 로깅한다. 그리고 CONTINUE를 리턴한다. 따라서 KafkaStream은 deserialize 할 때 예외가 발생한 레코드는 스킵하고, 다음 레코드를 계속해서 처리한다.
  • LogAndFailExceptionHandler : deserialization 예외를 로깅한다. 그리고 FAIL를 리턴한다. 따라서 KafkaStream은 shutdown되어 레코드 처리를 중단한다.

필요에 따라 DeserializationExceptionHandler 인터페이스 구현체를 직접 구현할 수도 있다. 커스터마이징 된 ExceptionHandler 구현 예제가 궁금한다면 Failure and exception handling에서 확인하길 바란다.

아래는 DeserializationExceptionHandler 인터페이스이다.

public interface DeserializationExceptionHandler extends Configurable {
    /**
     * Inspect a record and the exception received.
     * @param context processor context
     * @param record record that failed deserialization
     * @param exception the actual exception
     */
    DeserializationHandlerResponse handle(final ProcessorContext context,
                                          final ConsumerRecord<byte[], byte[]> record,
                                          final Exception exception);

    /**
     * Enumeration that describes the response from the exception handler.
     */
    enum DeserializationHandlerResponse {
        /* continue with processing */
        CONTINUE(0, "CONTINUE"),
        /* fail the processing and stop */
        FAIL(1, "FAIL");

        /** an english description of the api--this is for debugging and can change */
        public final String name;

        /** the permanent and immutable id of an API--this can't change ever */
        public final int id;

        DeserializationHandlerResponse(final int id, final String name) {
            this.id = id;
            this.name = name;
        }
    }

}

default.deserialization.exception.handler

KafkaStream에서 사용할 DeserializationExceptionHandler 구현체는 “default.deserialization.exception.handler” 설정을 통해 설정할 수 있다. 다음은 간단한 설정 예제이다. 아래의 코드는 LogAndContinueExaceptionHandler를 DeserializationExceptionHandler로 사용한다.

public StreamsConfig getStreamsConfig(EdaProperties edaProperties, StreamProperties streamProperties) {
    Map<String, Object> props = new HashMap<>();
    
    // configure
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
              LogAndContinueExceptionHandler.class);
    ...
        
    return new StreamsConfig(props);
}

RecordDeserializer

Task는 처리해야할 레코드를 RecordQueue에 넣는다. RecordQueue는 레코드를 큐에 넣기 전에 RecordDeserialzier를 사용해서 deserialize한다. 아래는 RecordDeserializer의 deserialize 메소드이다.

ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext,
                                           final ConsumerRecord<byte[], byte[]> rawRecord) {

    try {
        return new ConsumerRecord<>(
            rawRecord.topic(),
            rawRecord.partition(),
            rawRecord.offset(),
            rawRecord.timestamp(),
            TimestampType.CREATE_TIME,
            rawRecord.checksum(),
            rawRecord.serializedKeySize(),
            rawRecord.serializedValueSize(),
            sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
            sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()));
    } catch (final Exception deserializationException) {
        final DeserializationExceptionHandler.DeserializationHandlerResponse response;
        try {
            response = deserializationExceptionHandler.handle(processorContext, rawRecord, deserializationException);
        } catch (final Exception fatalUserException) {
            log.error("Deserialization error callback failed after deserialization error for record {}",
                      rawRecord,
                      deserializationException);
            throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
        }

        if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
            throw new StreamsException("Deserialization exception handler is set to fail upon" +
                " a deserialization error. If you would rather have the streaming pipeline" +
                " continue after a deserialization error, please set the " +
                DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
                deserializationException);
        } else {
            sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
        }
    }
    return null;
 }

위의 코드를 보면 알 수 있듯이 deserialize시 예외가 발생하면 DeserializationExceptionHandler#handle 메소드를 호출한다. 그리고 DeserializationExceptionHandler#handle 메소드가 FAIL을 리턴하는 경우 StreamsException이 발생한다. 그리고 이 경우엔 Streams가 shutdown된다. DeserializationExceptionHandler#handle 메소드가 CONTINUE를 리턴하는 경우는 널을 리턴한다. RecordQueue는 RecordDeserializer#deserialize의 리턴값이 널인 경우에는 해당 레코드를 스킵한다.