Senior Software Engineer (Presenter)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build
and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/aggregate/StreamsAggregateTest.java
.
In this exercise, you will write a unit test for the aggregation Kafka Streams application. You've already added the KStream
and the required pieces, so you’ll concentrate on what you need for a unit test using the TopologyTestDriver
(no broker needed).
Add your test code to the following starter code from previous exercises:
public class StreamsAggregateTest {
@Test
public void shouldAggregateRecords() {
final Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregate-test");
final String inputTopicName = "input";
final String outputTopicName = "output";
final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
final SpecificAvroSerde<ElectronicOrder> electronicSerde =
StreamsUtils.getSpecificAvroSerde(configMap);
final Serde<String> stringSerde = Serdes.String();
final Serde<Double> doubleSerde = Serdes.Double();
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopicName, Consumed.with(Serdes.String(), electronicSerde));
electronicStream.groupByKey().aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(stringSerde, doubleSerde))
.toStream().to(outputTopicName, Produced.with(Serdes.String(), Serdes.Double()));
}
}
Begin by adding a configuration to use a mock Schema Registry under your existing streamsProps
configuration:
streamsProps.put("schema.registry.url", "mock://aggregation-test");
This is an in-memory version of your Schema Registry, suitable for unit testing.
Next, under your groupByKey
statement, create a TopologyTestDriver
with the application topology and configuration. (Note that we use a try-with-resources
block to ensure that the test driver is closed at the end of the test, which is important for cleaning up state). Create a test input topic with the TopologyTestDriver
factory method createInputTopic
; you’ll use this in the test to drive input records into the Kafka Streams application. Provide a topic name and also SerDes, since a Kafka Streams application expects to see records in byte array format.
try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsProps)) {
final TestInputTopic<String, ElectronicOrder> inputTopic =
testDriver.createInputTopic(inputTopicName,
stringSerde.serializer(),
electronicSerde.serializer());
Now create a TestOutputTopic
with another TopologyTestDriver
factory method. The TestOutputTopic
captures results from the Kafka Streams application under test. Add the outputTopicName
and SerDes. (Note that the Kafka Streams application output is also in byte array format.)
final TestOutputTopic<String, Double> outputTopic =
testDriver.createOutputTopic(outputTopicName,
stringSerde.deserializer(),
doubleSerde.deserializer()
Next, you need some sample events for the test. Create a list of ElectronicOrder
objects for input into the topology. Here, each object is created using the builder provided by the generated Avro object code.
final List<ElectronicOrder> orders = new ArrayList<>();
orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("1").setUserId("vandeley").setTime(5L).setPrice(5.0).build());
orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("2").setUserId("penny-packer").setTime(5L).setPrice(15.0).build());
orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("3").setUserId("romanov").setTime(5L).setPrice(25.0).build());
Create the list of expected aggregation results. Note that the TopologyTestDriver
does not buffer out anything, so each input record emits a new update (similar to the behavior you see by setting the cache to zero bytes in the Kafka Streams application). Next, run some events into your test topology using inputTopic.pipeInput
.
List<Double> expectedValues = List.of(5.0, 20.0, 45.0);
orders.forEach(order -> inputTopic.pipeInput(order.getElectronicId(), order));
Get the output from Kafka Streams and use the outputTopic.readValuesToList
method to read only the values, output into a list (since we only care about the aggregation results). Finally, confirm the results of the test by asserting that the actualValues
list equals expectedValues
.
List<Double> actualValues = outputTopic.readValuesToList();
assertEquals(expectedValues, actualValues);
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.