Skip to content

Kafka Streams Test Lab 1

Updated 03/10/2022

Overview

  • In this lab scenario we are still using Apache Kafka Streams TestDriver to test a Topology, a Stream and Table.
  • While using the TestDriver we will perform operations such as groupBy, join with another Stream or Kafka Table.

The code for this lab is in this repository eda-kstreams-labs folder LoadKtableFromTopic

Scenario Prerequisites

Java

  • For the purposes of this lab we suggest Java 11+

Quarkus CLI

Maven

  • Maven will be needed for bootstrapping our application from the command-line and running our application.

An IDE of your choice

  • Ideally an IDE that supports Quarkus (such as Visual Studio Code)

Setting up the Quarkus Application

  • We will bootstrap the Quarkus application with the following Maven command
quarkus create LoadKtableFromTopic

You can replace the fields within {} as you like.

  • Since we will be using the Kafka Streams testing functionality we will need to edit the pom.xml to add the dependency to our project. Open pom.xml and add the following.
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>3.1.0</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest</artifactId>
    <version>2.2</version>
</dependency>

Creating your Test Class

  • Open the TestLoadKtableFromTopic.java file and paste the following content.
package ibm.eda.kstreams.lab1;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;

/**
 * This is a simple example of loading some reference data from stream into a Ktable for
 * lookup. It uses a persistent state store.
 */
@QuarkusTest
public class TestLoadKtableFromTopic {
    private static TopologyTestDriver testDriver;
    private static String companySectorsTopic = "sector-types";
    private static String storeName = "sector-types-store";

    private static TestInputTopic<String, String> inTopic;
    private static TestOutputTopic<String, Long> outTopic;
    private static TestOutputTopic<String, String> errorTopic;

    public static Properties getStreamsConfig() {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-lab1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummmy:1234");
        return props;
    }

    @BeforeAll
    public static void buildTopology(){
        final StreamsBuilder builder = new StreamsBuilder();
        // Adding a state store is a simple matter of creating a StoreSupplier
        // instance with one of the static factory methods on the Stores class.
        // all persistent StateStore instances provide local storage using RocksDB
        KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName);

        // A KTable is created from the companySectorsTopic, with key and value deserialized.
        // With Materialized.as() causing the Table to force a state store materialization (storeSupplier).
        KTable<String, String> sectorTypeTable = builder.table(companySectorsTopic,
                Consumed.with(Serdes.String(), Serdes.String()),
                Materialized.as(storeSupplier));

        testDriver = new TopologyTestDriver(builder.build(), getStreamsConfig());
        inTopic = testDriver.createInputTopic(companySectorsTopic, new StringSerializer(), new StringSerializer());

    }

    @AfterAll
    public static void close(){
        testDriver.close();
    }

    @Test
    public void shouldHaveSixSectorTypes(){
        inTopic.pipeInput("C01","Health Care");
        inTopic.pipeInput("C02","Finance");
        inTopic.pipeInput("C03","Consumer Services");
        inTopic.pipeInput("C04","Transportation");
        inTopic.pipeInput("C05","Capital Goods");
        inTopic.pipeInput("C06","Public Utilities");

        KeyValueStore<String,ValueAndTimestamp<String>> store = testDriver.getTimestampedKeyValueStore(storeName);
        Assertions.assertNotNull(store);

        ValueAndTimestamp<String> sector = store.get("C02");
        Assertions.assertNotNull(sector);
        Assertions.assertEquals("Finance", sector.value());
        Assertions.assertEquals(6, store.approximateNumEntries());


        // demonstrate how to get all the values from the table:
        KeyValueIterator<String, ValueAndTimestamp<String>> sectors = store.all();
        while (sectors.hasNext()) {
            KeyValue<String,ValueAndTimestamp<String>> s = sectors.next();
            System.out.println(s.key + ":" + s.value.value());
        }
        for ( StateStore s: testDriver.getAllStateStores().values()) {
            System.out.println(s.name());
        }
    }
}
  • The above code uses TopologyTestDriver to mimic a Topology. A Topology is basically a graph of stream processors (nodes) and the edges between these nodes are the streams. In the first section we instantiate our TopologyTestDriver named testDriver, as well as the topic name and store name.

  • Test the application by running the following:

./mvnw clean verify
  • You should see the tests pass with the following output:
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 14:20:26,488 INFO  [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 2.089s. Listening on: http://localhost:8081
2021-01-16 14:20:26,490 INFO  [io.quarkus] (main) Profile test activated. 
2021-01-16 14:20:26,490 INFO  [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.096 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
C01:Health Care
C02:Finance
C03:Consumer Services
C04:Transportation
C05:Capital Goods
C06:Public Utilities
sector-types-store
2021-01-16 14:20:28,253 WARN  [org.apa.kaf.str.sta.int.RocksDBStore] (main) Closing 1 open iterators for store sector-types-store
2021-01-16 14:20:28,256 INFO  [org.apa.kaf.str.pro.int.StateDirectory] (main) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.222 s - in com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
2021-01-16 14:20:28,292 INFO  [io.quarkus] (main) Quarkus stopped in 0.028s
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
  • How this test topology creation flow works:

    • A StreamsBuilder object (builder) from the Kafka Streams DSL API is created.
    • A KeyValueBytesStoreSupplier (storeSupplier) is configured with String variable (storeName).
    • A KTable is created reading from the topic (companySectorsTopic), deserialized and materialized as the previously create (storeSupplier).
    • A TopologyTestDriver (testDriver) is built from the provided config properties and the KTable within the builder topology.
    • Lastly test input topic (inTopic) is created from the testDriver topology.
    • When inTopic.pipeInput("C01","Health Care"); is invoked, it populates the topic, which then populates the KTable which ultimately persists in a KeyValue State Store.
  • You should see the tests pass. These are three simple tests. The first of which checks that the value fetched from the Kafka Table is not null,the second makes sure that value retrieved from key C02 is equal to Finance and lastly we make sure that the our state store (which was piped by ways of the Kafka Topic) indeed has six key-value pairs.

More Robust Kafka Streams Testing

  • add jsonb Serdes using Quarkus kafka client library:
quarkus  ext add kafka-client jsonb
  • Now that we have tested some simple functionality by using the Kafka Streams API let's check out some other operators that we can use.

  • Let's create a new class for our Plain Old Java Object (POJO) named FinancialMessage and copy and paste the following content into the newly created file.

package ibm.eda.kstreams.lab.domain;


import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class FinancialMessage implements JSONSerdeCompatible {

    public String userId;
    public String stockSymbol;
    public String exchangeId;
    public int quantity;
    public double stockPrice;
    public double totalCost;
    public int institutionId;
    public int countryId;
    public boolean technicalValidation;

    public FinancialMessage() {

    }

    public FinancialMessage(String userId, String stockSymbol, String exchangeId,
                            int quantity, double stockPrice, double totalCost,
                            int institutionId, int countryId, boolean technicalValidation) {

        this.userId = userId;
        this.stockSymbol = stockSymbol;
        this.exchangeId = exchangeId;
        this.quantity = quantity;
        this.stockPrice = stockPrice;
        this.totalCost = totalCost;
        this.institutionId = institutionId;
        this.countryId = countryId;
        this.technicalValidation = technicalValidation;
    }
}

Note: We have not provided any accessors (getters) or mutators (setters) for simplicity. You can set those at your own discretion.

Add the following interface for JSon Serde

/**
     * An interface for registering types that can be de/serialized with {@link JSONSerde}.
     */
    @SuppressWarnings("DefaultAnnotationParam") // being explicit for the example
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t")
    @JsonSubTypes({
                      @JsonSubTypes.Type(value = FinancialMessage.class, name = "fm")
                  })
public interface JSONSerdeCompatible {

}
  • And the Generic JSONSerde from this class

  • Now that we have our new Java class, let's create a new and separate Java Test class: src/test/java/ibm/eda/kstreams/lab1/TestFinancialMessage.java.

Copy the contents below:

package ibm.eda.kstreams.lab1;

import java.util.Properties;

import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.quarkus.kafka.client.serialization.JsonbSerde;
import io.quarkus.kafka.client.serialization.JsonbSerializer;
import io.quarkus.test.junit.QuarkusTest;

import com.ibm.garage.cpat.domain.*;


@QuarkusTest
public class TestFinancialMessage {

    private static TopologyTestDriver testDriver;
    private static String inTopicName = "transactions";
    private static String outTopicName = "output";
    private static String errorTopicName = "errors";
    private static String storeName = "transactionCount";
    private static TestInputTopic<String, FinancialMessage> inTopic;
    private static TestOutputTopic<String, Long> outTopic;
    private static TestOutputTopic<String, String> errorTopic;

    public static Properties getStreamsConfig() {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-lab2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummmy:2345");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
        return props;
    }

    @BeforeAll
    public static void buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName);

        KStream<String, FinancialMessage> transactionStream =
            builder.stream(
                inTopicName,
                Consumed.with(Serdes.String(), financialMessageSerde)
            );

        // First verify user id is present, if not route to error
        Map<String, KStream<String,FinancialMessage>> branches = transactionStream
            .split(Named.as("tx-"))
            .branch((key, value) -> value.userId == null, Branched.as("no-userid"))
            .defaultBranch(Branched.as("non-null"));

        // Handle error by sending to the errors topic.
        branches.get("tx-no-userid").map(
                (key, value) -> {
                    return KeyValue.pair(key, "No customer id provided");
                })
                .to(
                        errorTopicName, Produced.with(Serdes.String(), Serdes.String()));

        // use groupBy to swap the key, then count by customer id,
        branches.get("tx-non-null").groupBy(
                    (key, value) -> value.userId
                )
                .count(
                    Materialized.as(storeSupplier)
                )
                .toStream()
                .to(
                    outTopicName,
                    Produced.with(Serdes.String(), Serdes.Long())
            );

        testDriver = new TopologyTestDriver(builder.build(), getStreamsConfig());
        inTopic = testDriver.createInputTopic(inTopicName, new StringSerializer(), new JSONSerde<FinancialMessage>());
        outTopic = testDriver.createOutputTopic(outTopicName, new StringDeserializer(), new LongDeserializer());
        errorTopic = testDriver.createOutputTopic(errorTopicName, new StringDeserializer(), new StringDeserializer());
    }

    @AfterAll
    public static void close(){
        testDriver.close();
    }
}
  • We have the setup for the TestTopology. Now, we can add a test that will insert two events into the topic. Add the following code to your test class:
    @Test
    public void shouldHaveOneTransaction() {
        // A FinancialMessage is mocked and set to the input topic. Within the Topology,
        // this gets sent to the outTopic because a userId exists for the incoming message.

        FinancialMessage mock = new FinancialMessage(
            "1", "MET", "SWISS", 12, 1822.38, 21868.55, 94, 7, true
        );
        FinancialMessage mock2 = new FinancialMessage(
            "2", "ASDF", "HELLO", 5, 1000.22, 4444.12, 38, 6, true
        );

        inTopic.pipeInput("T01", mock);
        inTopic.pipeInput("T02", mock2);

        Assertions.assertFalse(outTopic.isEmpty());
        Assertions.assertEquals(1, outTopic.readKeyValue().value);

        KeyValueStore<String,ValueAndTimestamp<FinancialMessage>> store = testDriver.getTimestampedKeyValueStore(storeName);
        Assertions.assertEquals(1, store.approximateNumEntries());
    }
  • Test the application by running the following:
./mvnw clean verify
  • You should see the following output:
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 17:21:37,836 INFO  [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 1.996s. Listening on: http://localhost:8081
2021-01-16 17:21:37,837 INFO  [io.quarkus] (main) Profile test activated. 
2021-01-16 17:21:37,838 INFO  [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.234 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestFinancialMessage
2021-01-16 17:21:39,460 INFO  [org.apa.kaf.str.pro.int.StateDirectory] (main) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.273 s <<< FAILURE! - in com.ibm.garage.cpat.lab.TestFinancialMessage
[ERROR] shouldHaveOneTransaction  Time elapsed: 0.09 s  <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <1> but was: <2>
    at com.ibm.garage.cpat.lab.TestFinancialMessage.shouldHaveOneTransaction(TestFinancialMessage.java:132)

[INFO] Running com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
C01:Health Care
C02:Finance
C03:Consumer Services
C04:Transportation
C05:Capital Goods
C06:Public Utilities
sector-types-store
2021-01-16 17:21:39,505 WARN  [org.apa.kaf.str.sta.int.RocksDBStore] (main) Closing 1 open iterators for store sector-types-store
2021-01-16 17:21:39,507 INFO  [org.apa.kaf.str.pro.int.StateDirectory] (main) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.039 s - in com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
2021-01-16 17:21:39,541 INFO  [io.quarkus] (main) Quarkus stopped in 0.028s
[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Failures: 
[ERROR]   TestFinancialMessage.shouldHaveOneTransaction:132 expected: <1> but was: <2>
[INFO] 
[ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0
  • We see that our recently added test failed. And this is expected due to the fact that we inserted two records but our test expects one. To remedy this test we must change Assertions.assertEquals(1, store.approximateNumEntries()); Set to 2 the comparisson.

  • Next let's add another very simple test. Copy the following code to your Java test class:

    @Test
    public void testErrorTopicIsNotEmpty() {
        FinancialMessage mock = new FinancialMessage(
            null, "MET", "SWISS", 12, 1822.38, 21868.55, 94, 7, true
        );

        inTopic.pipeInput("T03", mock);

        Assertions.assertFalse(errorTopic.isEmpty());
    }
  • Test the application by running the following:
./mvnw clean verify
  • You should see the following output:
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 17:29:34,258 INFO  [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 2.470s. Listening on: http://localhost:8081
2021-01-16 17:29:34,260 INFO  [io.quarkus] (main) Profile test activated. 
2021-01-16 17:29:34,260 INFO  [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.694 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestFinancialMessage
2021-01-16 17:29:36,001 INFO  [org.apa.kaf.str.pro.int.StateDirectory] (main) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.309 s - in com.ibm.garage.cpat.lab.TestFinancialMessage
[INFO] Running com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
C01:Health Care
C02:Finance
C03:Consumer Services
C04:Transportation
C05:Capital Goods
C06:Public Utilities
sector-types-store
2021-01-16 17:29:36,057 WARN  [org.apa.kaf.str.sta.int.RocksDBStore] (main) Closing 1 open iterators for store sector-types-store
2021-01-16 17:29:36,059 INFO  [org.apa.kaf.str.pro.int.StateDirectory] (main) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.049 s - in com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
2021-01-16 17:29:36,099 INFO  [io.quarkus] (main) Quarkus stopped in 0.031s
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0

As you can see here, our message payload is created with null for the userId field which means this message will branch out to the errorTopic. The purpose of the test is to check if our errorTopic is empty, which should not be. Since our errorTopic.isEmpty() resolves to false and our assertion is asserting that it is false as well, thus the test passes.

  • Now that we have two simple tests, let's update our first branch to allow us to filter the stream on a condition that we want. Let's edit our branches[1] statement so that it will filter out and retain only the records where the totalCost is greater than 5000.
branches[1].filter(
            (key, value) -> (value.totalCost > 5000)
        )
        .groupBy(
            (key, value) -> value.userId
        )
        .count(
            Materialized.as(storeSupplier)
        )
        .toStream()
        .to(
            outTopicName,
            Produced.with(Serdes.String(), Serdes.Long())
        );
  • Test the application by running the following:
./mvnw clean verify
  • You should see the following output:
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 17:40:50,765 INFO  [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 2.102s. Listening on: http://localhost:8081
2021-01-16 17:40:50,766 INFO  [io.quarkus] (main) Profile test activated. 
2021-01-16 17:40:50,766 INFO  [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.474 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestFinancialMessage
2021-01-16 17:40:52,393 INFO  [org.apa.kaf.str.pro.int.StateDirectory] (main) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
[ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.307 s <<< FAILURE! - in com.ibm.garage.cpat.lab.TestFinancialMessage
[ERROR] shouldHaveOneTransaction  Time elapsed: 0.022 s  <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <2> but was: <1>
    at com.ibm.garage.cpat.lab.TestFinancialMessage.shouldHaveOneTransaction(TestFinancialMessage.java:135)

[INFO] Running com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
C01:Health Care
C02:Finance
C03:Consumer Services
C04:Transportation
C05:Capital Goods
C06:Public Utilities
sector-types-store
2021-01-16 17:40:52,445 WARN  [org.apa.kaf.str.sta.int.RocksDBStore] (main) Closing 1 open iterators for store sector-types-store
2021-01-16 17:40:52,447 INFO  [org.apa.kaf.str.pro.int.StateDirectory] (main) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.046 s - in com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
2021-01-16 17:40:52,487 INFO  [io.quarkus] (main) Quarkus stopped in 0.031s
[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Failures: 
[ERROR]   TestFinancialMessage.shouldHaveOneTransaction:135 expected: <2> but was: <1>
[INFO] 
[ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0

We see that our first tests is now failing again. And this is expected because we are changing the logic of how branches[1] works to filter out those transactions less than 5000. This makes the second record we send in to get filtered out. In order to fix our test again, we either decrease the assertion for the expected entries in our store back to 1 or we modify the amount of the second transaction to be greater than 5000. Once we do that, if we test the application again, we should get all tests passing.

Next Steps

  • Now that you have finished this initial part of Lab 1 you can optionally proceed to Lab 2