Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

Hands On: Testing

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/aggregate/StreamsAggregateTest.java.

In this exercise, you will write a unit test for the aggregation Kafka Streams application. You've already added the KStream and the required pieces, so you’ll concentrate on what you need for a unit test using the TopologyTestDriver (no broker needed).

  1. Add your test code to the following starter code from previous exercises:

    public class StreamsAggregateTest {
        @Test	
        public void shouldAggregateRecords() {
    
            final Properties streamsProps = new Properties();
            streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregate-test");
    
            final String inputTopicName = "input";
            final String outputTopicName = "output";
            final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
    
            final SpecificAvroSerde<ElectronicOrder> electronicSerde =
                StreamsUtils.getSpecificAvroSerde(configMap);
            final Serde<String> stringSerde = Serdes.String();	
            final Serde<Double> doubleSerde = Serdes.Double();	
    
            final StreamsBuilder builder = new StreamsBuilder();
    
            final KStream<String, ElectronicOrder> electronicStream =
                builder.stream(inputTopicName, Consumed.with(Serdes.String(), electronicSerde));
    
            electronicStream.groupByKey().aggregate(() -> 0.0,
                      (key, order, total) -> total + order.getPrice(),
                      Materialized.with(stringSerde, doubleSerde))
                      .toStream().to(outputTopicName, Produced.with(Serdes.String(), Serdes.Double()));
          }
    }
    
  2. Begin by adding a configuration to use a mock Schema Registry under your existing streamsProps configuration:

    streamsProps.put("schema.registry.url", "mock://aggregation-test");

    This is an in-memory version of your Schema Registry, suitable for unit testing.

  3. Next, under your groupByKey statement, create a TopologyTestDriver with the application topology and configuration. (Note that we use a try-with-resources block to ensure that the test driver is closed at the end of the test, which is important for cleaning up state). Create a test input topic with the TopologyTestDriver factory method createInputTopic; you’ll use this in the test to drive input records into the Kafka Streams application. Provide a topic name and also SerDes, since a Kafka Streams application expects to see records in byte array format.

    try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsProps)) {
        final TestInputTopic<String, ElectronicOrder> inputTopic =
            testDriver.createInputTopic(inputTopicName, 
                        stringSerde.serializer(),
    			        electronicSerde.serializer()); 
  4. Now create a TestOutputTopic with another TopologyTestDriver factory method. The TestOutputTopic captures results from the Kafka Streams application under test. Add the outputTopicName and SerDes. (Note that the Kafka Streams application output is also in byte array format.)

    final TestOutputTopic<String, Double> outputTopic = 
        testDriver.createOutputTopic(outputTopicName, 
                    stringSerde.deserializer(),
    			    doubleSerde.deserializer());
  5. Next, you need some sample events for the test. Create a list of ElectronicOrder objects for input into the topology. Here, each object is created using the builder provided by the generated Avro object code.

    final List<ElectronicOrder> orders = new ArrayList<>();       
    orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("1").setUserId("vandeley").setTime(5L).setPrice(5.0).build());
    orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("2").setUserId("penny-packer").setTime(5L).setPrice(15.0).build());
    orders.add(ElectronicOrder.newBuilder().setElectronicId("one").setOrderId("3").setUserId("romanov").setTime(5L).setPrice(25.0).build());
  6. Create the list of expected aggregation results. Note that the TopologyTestDriver does not buffer out anything, so each input record emits a new update (similar to the behavior you see by setting the cache to zero bytes in the Kafka Streams application). Next, run some events into your test topology using inputTopic.pipeInput.

    List<Double> expectedValues = List.of(5.0, 20.0, 45.0);
    orders.forEach(order -> inputTopic.pipeInput(order.getElectronicId(), order));
  7. Get the output from Kafka Streams and use the outputTopic.readValuesToList method to read only the values, output into a list (since we only care about the aggregation results). Finally, confirm the results of the test by asserting that the actualValues list equals expectedValues.

    List<Double> actualValues = outputTopic.readValuesToList();
    assertEquals(expectedValues, actualValues);

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.