EmbeddedKafkaCluster

KafkaStreams의 소스 코드를 보면 Integration Test 작성을 위해서 EmbeddedKafkaCluster를 많이 사용하는것을 볼 수 있다. EmbeddedKafkaCluster를 사용하면 마치 로컬에서 Kafka Broker를 실행시키는 것과 같은 효과를 얻을 수 있다.

public class StreamIntegrationTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER =
        new EmbeddedKafkaCluster(NUM_BROKERS);

    @Before
    public void before() throws InterruptedException {
        
    }

    @After
    public void whenShuttingDown() throws IOException {
       
    }

    @Test
    public void test() throws Exception {
        
    }
}

다음과 같이 EmbeddedKafkaCluster에 @ClassRule 어노테이션을 추가하면 EmbeddedKafkaCluster를 사용해서 Integraion Test를 작성할 수 있다. 생성자를 통해서 브로커의 개수를 설정할 수 있다.

보통 @Before에서 테스트에 사용할 토픽을 생성한다. EmbeddedKafkaCluster#createTopic 메소드를 사용하면 토픽을 생성할 수 있다. 아래는 createTopic 메소드 코드이다. 토픽을 생성할 때 파티션의 개수나 Replication Factor를 정할 수 있다.

/**
 * Create a Kafka topic with the given parameters.
 *
 * @param topic       The name of the topic.
 * @param partitions  The number of partitions for this topic.
 * @param replication The replication factor for (the partitions of) this topic.
 */
public void createTopic(final String topic, final int partitions, final int replication) throws InterruptedException {
    createTopic(topic, partitions, replication, new Properties());
}

또한 EmbeddedKafkaCluster#createTopics 메소드를 사용하면 한 개 이상의 토픽을 생성할 수 있다. 단 이 때는 파티션 개수나 Replication Factor를 정할 수 없고 둘 다 1로 설정이 된다.

public void createTopics(final String... topics) throws InterruptedException {
    for (final String topic : topics) {
        createTopic(topic, 1, 1, new Properties());
    }
}

주의할 점은 KafkaStreams 객체를 생성할 때 사용하는 Properties의 BOOTSTRAP_SERVERS_CONFIG를 EmbeddedKafkaCluster#bootstrapServers() 메소드의 리턴값으로 세팅해야한다는 것이다. 그래야만 KafkaStreams 객체가 EmbeddedKafkaCluster와 상호작용 할 것이다.

아래는 그 예이다.

Properties streamsConfiguration = new Properties();
streamsConfiguration
    .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

// 생략
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);

테스트를 위해서 특정 토픽에 메시지를 보내야할 경우가 있을 수 있다. 그 때는 IntegrationTestUtils#produceKeyValuesSynchronouslyXXXX 메소드를 사용하면 된다. IntegrationTestUtils에는 다양한 메소드가 존재하는데 이 메소드들을 잘 활용하면 Integration Test를 보다 쉽게 할 수 있다.

아래의 코드는 IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp 메소드를 사용해서 카프카 메시지를 생성하는 코드이다. IntegrationTestUtils에서는 카프카 메시지를 생성하기 위해서 몇가지 메소드를 제공하고 있다. 각 메소드는 설명만 보면 쉽게 이해가 되므로 자세한 설명은 생략하도록 하겠다.

private void produceMessages(final long timestamp) throws Exception {
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
        topic,
        Arrays.asList(
            new KeyValue<>(1, "A"),
            new KeyValue<>(2, "B"),
            new KeyValue<>(3, "C"),
            new KeyValue<>(4, "D"),
            new KeyValue<>(5, "E")),
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            IntegerSerializer.class,
            StringSerializer.class,
            new Properties()),
        timestamp);
}

만약에 KafkaStreams의 토폴로지안에서 메시지 처리 결과를 특정 토픽으로 보내고 있다면, 처리 결과를 확인하기 위해서 결과 토픽을 확인해야 하는 경우가 있을수 있다. 다행히도 IntegrationTestUtils는 특정 토픽의 메시지를 읽어 올 수 있는 메소드를 제공하고 있다. 많이 사용하는 메소드는 waitUntilMinValuesRecordsReceived 메소드이다.

/**
 * Wait until enough data (value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws AssertionError       if the given wait time elapses
 */
public static <K, V> List<KeyValue<K, V>> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
                                                            final String topic,
                                                            final int expectedNumRecords,
                                                            final long waitTime) throws InterruptedException 
  • consumerConfig : KafkaConsumer 설정
  • topic : 메시지를 가져올 토픽
  • expectedNumRecords : 예상 레코드의 최소 수 (최소한 이 값 이상의 레코드를 읽어와야 메소드가 리턴된다)
  • waitTime : 최대 대기 시간
private <K, V> List<KeyValue<K, V>> receiveMessages(final int numMessages) throws InterruptedException {
    final Properties consumerProperties = new Properties();
    consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
    consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // 생략
    
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            consumerProperties,
            outputTopic,
            numMessages,
            60 * 1000);
}

위의 코드는 waitUntilMinValuesRecordsReceived 메소드를 사용해서 특정 토픽의 레코드를 읽는 간단한 코드이다. 위의 코드는 numMessages 수만큼의 레코드를 읽을 때까지 최대 60초간 대기한다.

waitUntilMinValuesRecordsReceived 메소드 외에도 다양한 메소드를 통해서 카프카 메시지를 읽어올 수 있다. 더 자세한건 IntegrationTestUtils 클래스의 코드를 참고하길 바란다.

특정 조건이 일어날 때까지 테스트가 종료되지 않게 하거나 아니면 다음 작업을 해야하지 말아야 할 경우에는 TestUtils.waitForCondition 메소드를 사용하면 좋다. 예를 들어 KafkaStreams가 시작되고 난 뒤에 토픽에 메시지를 보내야한다면 아래와 같이 코딩하면 된다. (아래 코드만으로 이해가 되지 않으면 sample code를 참고하길 바란다)

    @Test
    public void embeddedKafkaTest() throws Exception {
        TestUtils.waitForCondition(() -> wordCountStream.isRunning(), 10 * 1000, "Steams never started");

        produceMessages();

        // do something
    }

사용 후기

Kafka에서는 테스트를 위한 클래스들을 많이 만들어놨다. 이 클래스들을 잘 활용하면 KafkaStream, Topology 등을 테스트 할 수 있다. 직접 써보니 너무나도 유용했고 사용법도 쉽기 때문에 금방 따라할 수 있을 것이다.

Sample Code

아래 링크에 가면 샘플 코드를 볼 수 있다.

  • https://github.com/Gunju-Ko/KafkaStream-ProcessorAPI