Kafka Streams Test Lab 0¶
An introduction to using test Kafka Streams Test Suite to test Kafka Streams Topologies.
Info
Updated 03/10/2022
Overview¶
- We are testing a Kafka Streams topology using Apache Kafka Streams TestDriver.
- The topology
- While using the TestDriver we will perform basic stateless operations and understand the testing infrastructure.
The code for this lab is in this repository eda-kstreams-labs folder kstream-lab0
Scenario Prerequisites¶
Java
- For the purposes of this lab we suggest Java 8+
Maven
- Maven will be needed for bootstrapping our application from the command-line and running our application.
An IDE of your choice
- Ideally an IDE that supports Quarkus (such as Visual Studio Code)
Setting up the Quarkus Application¶
- We will bootstrap the Quarkus application with the following command
- Since we will be using the Kafka Streams testing functionality, we will need to edit the
pom.xml
to add the dependency to our project. Openpom.xml
and add the following:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>3.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>2.2</version>
</dependency>
The last dependency is for the hamcrest Domain Specific Language for test assertion.
Creating your first Test Class¶
-
Now let's create our first Test Class.
-
Create the directory structure you will need for your Java file. (NOTE: If you are working in an IDE, this may be done for you when you create your package and classes.)
-
Create a new file named
src/test/java/eda/kstreams/lab0/FirstKafkaStreamsTest.java
. -
Paste the following content into the
FirstKafkaStreamsTest
class:
package eda.kafka.streams;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import io.quarkus.test.junit.QuarkusTest;
@QuarkusTest
public class FirstKafkaStreamsTest {
private static TopologyTestDriver testDriver;
private static String inTopicName = "my-input-topic";
private static String outTopicName = "my-output-topic";
private static TestInputTopic<String, String> inTopic;
private static TestOutputTopic<String, String> outTopic;
@BeforeEach
public void buildTopology() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-lab0");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummmy:2345");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> basicColors = builder.stream(inTopicName,Consumed.with(Serdes.String(), Serdes.String()));
basicColors.peek((key, value) -> System.out.println("PRE-FILTER: key=" + key + ", value=" + value))
.filter((key, value) -> ("BLUE".equalsIgnoreCase(value)))
.peek((key, value) -> System.out.println("POST-FILTER: key=" + key + ", value=" + value))
.to(outTopicName);
Topology topology = builder.build();
testDriver = new TopologyTestDriver(topology, props);
inTopic = testDriver.createInputTopic(inTopicName, new StringSerializer(), new StringSerializer());
outTopic = testDriver.createOutputTopic(outTopicName, new StringDeserializer(), new StringDeserializer());
}
@AfterEach
public void teardown() {
testDriver.close();
}
}
-
The above code does a lot in a few lines, so we'll walk through some of that here.
- The
@BeforeEach
annotation on thebuildTopology
method means that it will be run each time before each test is executed, while the@AfterEach
annotation on theteardown
method ensures that it will be run each time after each test execution. This allows us to spin up and tear down all the necessary components to test in isolation with each test case. - The
buildTopology
method utilizes theStreamsBuilder
class to construct a simple topology, reading from the input Kafka topic defined by theinTopicName
String. -
The topology, we build here, utilizes three of the stateless processors the Kafka Streams API:
peek
allows us to look at the key and the value of the record passing through the stream and continue processing it unaffected (so we leverage this before and after the next processor used to see what is making its way through the topology)filter
allows us to drop records that do not meet the criteria specified (either for the key or the value). In this test class, we are filtering on any value that does not match the word"BLUE"
(using a case-insensitive search)to
is the final processor used and to write the contents of the topology at that point to an output Kafka topic
-
The Kafka Streams Test infrastructure provides us the capability to leverage driver classes that function as their own input and output topics, removing the need from connecting directly to a live Kafka instance. The
inTopic
andoutTopic
instantiation at the bottom of thebuildTopology
method hooks into this test infrastructure, so that our test methods can use them to write to and read from the topology. - The
teardown
method cleans up the topology and all the data that has been sent through it for any given test run, allowing us to reset and rerun test cases as needed.
- The
-
Build the application by running the following:
-
You should see output similar to the following:
... [INFO] [INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] [INFO] Results: [INFO] [INFO] Tests run: 0, Failures: 0, Errors: 0, Skipped: 0 [INFO] ... [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 29.470 s [INFO] Finished at: 2020-09-17T09:34:26-05:00 [INFO] ------------------------------------------------------------------------
-
The build compiled and the test topology was successfully created. But no tests were run, because no tests were written!
Add your first Tests¶
- Open
src/test/java/eda/kafka/streams/FirstKafkaStreamsTest.java
and add the following tests to the bottom of theFirstKafkaStreamsTest
class:
@Test
public void isEmpty() {
assertThat(outTopic.isEmpty(), is(true));
}
@Test
public void isNotEmpty() {
assertThat(outTopic.isEmpty(), is(true));
inTopic.pipeInput("C01", "blue");
assertThat(outTopic.getQueueSize(), equalTo(1L) );
assertThat(outTopic.readValue(), equalTo("blue"));
assertThat(outTopic.getQueueSize(), equalTo(0L) );
}
@Test
public void selectBlues() {
assertThat(outTopic.isEmpty(), is(true));
inTopic.pipeInput("C01", "blue");
inTopic.pipeInput("C02", "red");
inTopic.pipeInput("C03", "green");
inTopic.pipeInput("C04", "Blue");
assertThat(outTopic.getQueueSize(), equalTo(2L) );
assertThat(outTopic.isEmpty(), is(false));
assertThat(outTopic.readValue(), equalTo("blue"));
assertThat(outTopic.readValue(), equalTo("Blue"));
assertThat(outTopic.getQueueSize(), equalTo(0L) );
}
- These are three simple tests:
- The
isEmpty
test method checks to make sure the output topic is empty when nothing is sent through the topology - The
isNotEmpty
test method checks to make sure the output topic is not empty when an item matching our filters is sent through the topology -
The
selectBlues
test method checks to make sure that our topology is filtering correctly when we send multiple items through the topology and the output topic empties correctly when the testing infrastructure reads from it. -
You should see the tests pass with the following output:
[INFO]
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running eda.kafka.streams.FirstKafkaStreamsTest
2020-09-17 09:44:33,247 INFO [io.sma.rea.mes.provider] (main) SRMSG00208: Deployment done... start processing
2020-09-17 09:44:33,250 INFO [io.sma.rea.mes.provider] (main) SRMSG00226: Found incoming connectors: [smallrye-kafka]
2020-09-17 09:44:33,251 INFO [io.sma.rea.mes.provider] (main) SRMSG00227: Found outgoing connectors: [smallrye-kafka]
2020-09-17 09:44:33,252 INFO [io.sma.rea.mes.provider] (main) SRMSG00229: Channel manager initializing...
2020-09-17 09:44:33,254 INFO [io.sma.rea.mes.provider] (main) SRMSG00209: Initializing mediators
2020-09-17 09:44:33,255 INFO [io.sma.rea.mes.provider] (main) SRMSG00215: Connecting mediators
2020-09-17 09:44:33,382 INFO [io.quarkus] (main) Quarkus 1.8.0.Final on JVM started in 2.029s. Listening on: http://0.0.0.0:8081
2020-09-17 09:44:33,382 INFO [io.quarkus] (main) Profile test activated.
2020-09-17 09:44:33,382 INFO [io.quarkus] (main) Installed features: [cdi, kafka-streams, mutiny, resteasy-jsonb, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
PRE-FILTER: key=C01, value=blue
POST-FILTER: key=C01, value=blue
PRE-FILTER: key=C02, value=red
PRE-FILTER: key=C03, value=green
PRE-FILTER: key=C04, value=Blue
POST-FILTER: key=C04, value=Blue
PRE-FILTER: key=C01, value=blue
POST-FILTER: key=C01, value=blue
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.722 s - in eda.kafka.streams.FirstKafkaStreamsTest
2020-09-17 09:44:34,026 INFO [io.sma.rea.mes.provider] (main) SRMSG00207: Cancel subscriptions
2020-09-17 09:44:34,038 INFO [io.quarkus] (main) Quarkus stopped in 0.024s
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO]
Next Steps¶
- Now that you have finished the foundational Kafka Streams testing lab, you can proceed to Lab 1 for a deeper dive into more robust real-world Kafka Streams testing use cases!