Skip to content

Kafka Streams Test Lab 0

An introduction to using test Kafka Streams Test Suite to test Kafka Streams Topologies.

Info

Updated 03/10/2022

Overview

  • We are testing a Kafka Streams topology using Apache Kafka Streams TestDriver.
  • The topology
  • While using the TestDriver we will perform basic stateless operations and understand the testing infrastructure.

The code for this lab is in this repository eda-kstreams-labs folder kstream-lab0

Scenario Prerequisites

Java

  • For the purposes of this lab we suggest Java 8+

Quarkus CLI

Maven

  • Maven will be needed for bootstrapping our application from the command-line and running our application.

An IDE of your choice

  • Ideally an IDE that supports Quarkus (such as Visual Studio Code)

Setting up the Quarkus Application

  • We will bootstrap the Quarkus application with the following command
quarkus create kstream-lab0
  • Since we will be using the Kafka Streams testing functionality, we will need to edit the pom.xml to add the dependency to our project. Open pom.xml and add the following:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>3.1.0</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest</artifactId>
    <version>2.2</version>
</dependency>

The last dependency is for the hamcrest Domain Specific Language for test assertion.

Creating your first Test Class

  • Now let's create our first Test Class.

  • Create the directory structure you will need for your Java file. (NOTE: If you are working in an IDE, this may be done for you when you create your package and classes.)

mkdir -p src/test/java/ibm/eda/kstreams/lab0
  • Create a new file named src/test/java/eda/kstreams/lab0/FirstKafkaStreamsTest.java.

  • Paste the following content into the FirstKafkaStreamsTest class:

package eda.kafka.streams;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class FirstKafkaStreamsTest {

    private static TopologyTestDriver testDriver;
    private static String inTopicName = "my-input-topic";
    private static String outTopicName = "my-output-topic";

    private static TestInputTopic<String, String> inTopic;
    private static TestOutputTopic<String, String> outTopic;

    @BeforeEach
    public void buildTopology() {

        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-lab0");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummmy:2345");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> basicColors = builder.stream(inTopicName,Consumed.with(Serdes.String(), Serdes.String()));
        basicColors.peek((key, value) -> System.out.println("PRE-FILTER: key=" + key + ", value=" + value))
            .filter((key, value) -> ("BLUE".equalsIgnoreCase(value)))
            .peek((key, value) -> System.out.println("POST-FILTER: key=" + key + ", value=" + value))
            .to(outTopicName);

        Topology topology = builder.build();

        testDriver = new TopologyTestDriver(topology, props);
        inTopic = testDriver.createInputTopic(inTopicName, new StringSerializer(), new StringSerializer());
        outTopic = testDriver.createOutputTopic(outTopicName, new StringDeserializer(), new StringDeserializer());

    }

    @AfterEach
    public void teardown() {
        testDriver.close();
    }

}
  • The above code does a lot in a few lines, so we'll walk through some of that here.

    • The @BeforeEach annotation on the buildTopology method means that it will be run each time before each test is executed, while the @AfterEach annotation on the teardown method ensures that it will be run each time after each test execution. This allows us to spin up and tear down all the necessary components to test in isolation with each test case.
    • The buildTopology method utilizes the StreamsBuilder class to construct a simple topology, reading from the input Kafka topic defined by the inTopicName String.
    • The topology, we build here, utilizes three of the stateless processors the Kafka Streams API:

      • peek allows us to look at the key and the value of the record passing through the stream and continue processing it unaffected (so we leverage this before and after the next processor used to see what is making its way through the topology)
      • filter allows us to drop records that do not meet the criteria specified (either for the key or the value). In this test class, we are filtering on any value that does not match the word "BLUE" (using a case-insensitive search)
      • to is the final processor used and to write the contents of the topology at that point to an output Kafka topic
    • The Kafka Streams Test infrastructure provides us the capability to leverage driver classes that function as their own input and output topics, removing the need from connecting directly to a live Kafka instance. The inTopic and outTopic instantiation at the bottom of the buildTopology method hooks into this test infrastructure, so that our test methods can use them to write to and read from the topology.

    • The teardown method cleans up the topology and all the data that has been sent through it for any given test run, allowing us to reset and rerun test cases as needed.
  • Build the application by running the following:

./mvnw clean verify
  • You should see output similar to the following:

    ...
    [INFO]
    [INFO] -------------------------------------------------------
    [INFO]  T E S T S
    [INFO] -------------------------------------------------------
    [INFO]
    [INFO] Results:
    [INFO]
    [INFO] Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
    [INFO]
    ...
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  29.470 s
    [INFO] Finished at: 2020-09-17T09:34:26-05:00
    [INFO] ------------------------------------------------------------------------
    

  • The build compiled and the test topology was successfully created. But no tests were run, because no tests were written!

Add your first Tests

  • Open src/test/java/eda/kafka/streams/FirstKafkaStreamsTest.java and add the following tests to the bottom of the FirstKafkaStreamsTest class:
    @Test
    public void isEmpty() {
        assertThat(outTopic.isEmpty(), is(true));
    }

    @Test
    public void isNotEmpty() {
        assertThat(outTopic.isEmpty(), is(true));
        inTopic.pipeInput("C01", "blue");
        assertThat(outTopic.getQueueSize(), equalTo(1L) );
        assertThat(outTopic.readValue(), equalTo("blue"));
        assertThat(outTopic.getQueueSize(), equalTo(0L) );
    }

    @Test
    public void selectBlues() {
        assertThat(outTopic.isEmpty(), is(true));

        inTopic.pipeInput("C01", "blue");
        inTopic.pipeInput("C02", "red");
        inTopic.pipeInput("C03", "green");
        inTopic.pipeInput("C04", "Blue");

        assertThat(outTopic.getQueueSize(), equalTo(2L) );

        assertThat(outTopic.isEmpty(), is(false));

        assertThat(outTopic.readValue(), equalTo("blue"));
        assertThat(outTopic.readValue(), equalTo("Blue"));

        assertThat(outTopic.getQueueSize(), equalTo(0L) );

    }
  • These are three simple tests:
  • The isEmpty test method checks to make sure the output topic is empty when nothing is sent through the topology
  • The isNotEmpty test method checks to make sure the output topic is not empty when an item matching our filters is sent through the topology
  • The selectBlues test method checks to make sure that our topology is filtering correctly when we send multiple items through the topology and the output topic empties correctly when the testing infrastructure reads from it.

  • You should see the tests pass with the following output:

[INFO]
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running eda.kafka.streams.FirstKafkaStreamsTest
2020-09-17 09:44:33,247 INFO  [io.sma.rea.mes.provider] (main) SRMSG00208: Deployment done... start processing
2020-09-17 09:44:33,250 INFO  [io.sma.rea.mes.provider] (main) SRMSG00226: Found incoming connectors: [smallrye-kafka]
2020-09-17 09:44:33,251 INFO  [io.sma.rea.mes.provider] (main) SRMSG00227: Found outgoing connectors: [smallrye-kafka]
2020-09-17 09:44:33,252 INFO  [io.sma.rea.mes.provider] (main) SRMSG00229: Channel manager initializing...
2020-09-17 09:44:33,254 INFO  [io.sma.rea.mes.provider] (main) SRMSG00209: Initializing mediators
2020-09-17 09:44:33,255 INFO  [io.sma.rea.mes.provider] (main) SRMSG00215: Connecting mediators
2020-09-17 09:44:33,382 INFO  [io.quarkus] (main) Quarkus 1.8.0.Final on JVM started in 2.029s. Listening on: http://0.0.0.0:8081
2020-09-17 09:44:33,382 INFO  [io.quarkus] (main) Profile test activated.
2020-09-17 09:44:33,382 INFO  [io.quarkus] (main) Installed features: [cdi, kafka-streams, mutiny, resteasy-jsonb, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
PRE-FILTER: key=C01, value=blue
POST-FILTER: key=C01, value=blue
PRE-FILTER: key=C02, value=red
PRE-FILTER: key=C03, value=green
PRE-FILTER: key=C04, value=Blue
POST-FILTER: key=C04, value=Blue
PRE-FILTER: key=C01, value=blue
POST-FILTER: key=C01, value=blue
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.722 s - in eda.kafka.streams.FirstKafkaStreamsTest
2020-09-17 09:44:34,026 INFO  [io.sma.rea.mes.provider] (main) SRMSG00207: Cancel subscriptions
2020-09-17 09:44:34,038 INFO  [io.quarkus] (main) Quarkus stopped in 0.024s
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO]

Next Steps

  • Now that you have finished the foundational Kafka Streams testing lab, you can proceed to Lab 1 for a deeper dive into more robust real-world Kafka Streams testing use cases!