Senior Software Engineer (Presenter)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/aggregate/StreamsAggregateTest.java.
In this exercise, you will write a unit test for the aggregation Kafka Streams application. You've already added the KStream and the required pieces, so you’ll concentrate on what you need for a unit test using the TopologyTestDriver (no broker needed).
Add your test code to the following starter code from previous exercises:
public class StreamsAggregateTest {
@Test
public void shouldAggregateRecords() {
final Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregate-test");
final String inputTopicName = "input";
final String outputTopicName = "output";
final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
final SpecificAvroSerde<ElectronicOrder> electronicSerde =
StreamsUtils.getSpecificAvroSerde(configMap);
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopicName, Consumed.with(Serdes.String(), electronicSerde));
electronicStream.groupByKey().aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(stringSerde, doubleSerde))
.toStream().to(outputTopicName, Produced.with(Serdes.String(), Serdes.Double()));
}
}
Begin by adding a configuration to use a mock Schema Registry under your existing streamsProps configuration:
streamsProps.put("schema.registry.url", "mock://aggregation-test");
This is an in-memory version of your Schema Registry, suitable for unit testing.
Next, under your groupByKey statement, create a TopologyTestDriver with the application topology and configuration. (Note that we use a try-with-resources block to ensure that the test driver is closed at the end of the test, which is important for cleaning up state). Create a test input topic with the TopologyTestDriver factory method createInputTopic; you’ll use this in the test to drive input records into the Kafka Streams application. Provide a topic name and also SerDes, since a Kafka Streams application expects to see records in byte array format.
try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsProps)) {
final TestInputTopic<String, ElectronicOrder> inputTopic =
testDriver.createInputTopic(inputTopicName,
stringSerde.serializer(),
electronicSerde.serializer());
Now create a TestOutputTopic with another TopologyTestDriver factory method. The TestOutputTopic captures results from the Kafka Streams application under test. Add the outputTopicName and SerDes. (Note that the Kafka Streams application output is also in byte array format.)
final TestOutputTopic<String, Double> outputTopic =
testDriver.createOutputTopic(outputTopicName,
stringSerde.deserializer(),
doubleSerde.deserializer());
Next, you need some sample events for the test. Create a list of ElectronicOrder objects for input into the topology. Here, each object is created using the builder provided by the generated Avro object code.
final List<ElectronicOrder> orders = new ArrayList<>();
orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("1").setUserId("vandeley").setTime(5L).setPrice(5.0).build());
orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("2").setUserId("penny-packer").setTime(5L).setPrice(15.0).build());
orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("3").setUserId("romanov").setTime(5L).setPrice(25.0).build());
Create the list of expected aggregation results. Note that the TopologyTestDriver does not buffer out anything, so each input record emits a new update (similar to the behavior you see by setting the cache to zero bytes in the Kafka Streams application). Next, run some events into your test topology using inputTopic.pipeInput.
List<Double> expectedValues = List.of(5.0, 20.0, 45.0);
orders.forEach(order -> inputTopic.pipeInput(order.getElectronicId(), order));
Get the output from Kafka Streams and use the outputTopic.readValuesToList method to read only the values, output into a list (since we only care about the aggregation results). Finally, confirm the results of the test by asserting that the actualValues list equals expectedValues.
List<Double> actualValues = outputTopic.readValuesToList();
assertEquals(expectedValues, actualValues);
In this exercise, we're going to write a unit test for the aggregation Kafka Streams application. We've already added the case stream and the required pieces that you already know about. You're going to concentrate on what you need for creating a unit test with a topology test driver. No broker needed. First, you'll add a config to use a mock schema registry. It's an in-memory version of schema registry suitable for unit testing. Now let's create the topology test driver with the application topology and configuration. Note that it's done with try with resources block to ensure that the test driver gets closed at the end of the test. It's important to always close the topology test driver at the end to clean up any state. Next, create a test input topic with the topology test driver factory method, create input topic. You'll use this in the test to drive input records into the Kafka Streams application. Now provide the topic name key serializer and value serializer. A Kafka Streams application expects to receive records in bit array format. Next you'll create a test output topic with another topology test driver factory method, create output topic. The test output topic captures results from the streams application under test. To complete the test output topic, add the output topic name, key deserializer and value deserializer. The streams application output is also in bit array format. Now, we need some sample events for the test. Create a list of electronic order objects for input into the topology. Here, each object gets created using the builder provided by the generated add brio object code. Next create the list of expected aggregation results. Note that the TTD does not buffer anything. So each input record emits a new update, similar to the behavior you'd be seeing by setting the cache to zero bytes in the Kafka Streams application. Okay. It's time to run some events into the test topology. For each electronic order in the list, we'll pipe it into the Kafka Streams topology using the input topic dot pipe input method. Then you'll get the output from Kafka streams. In this case, you'll read only the value of the output into a list with an output topic read values to list method. Since we only care about the aggregation results. Finally, let's confirm the results of the test by asserting the actual value is equal to the expected ones. And here's the test running in Intellij. Notice how fast it completes.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.