Course: Kafka Streams 101

Hands On: Aggregations

2 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)

Hands On: Aggregations

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/aggregate/StreamsAggregate.java.

This hands-on exercise demonstrates stateful operations in Kafka Streams, specifically aggregation, using a simulated stream of electronic purchases. You'll see the incoming records on the console along with the aggregation results.

  1. To begin, let’s extract the names of the topics from the configuration, which we’ve already loaded via a static helper method. Then we’ll convert the properties to a HashMap and use another utility method to create the specific record AvroSerde.

    String inputTopic = streamsProps.getProperty("aggregate.input.topic");
    String outputTopic = streamsProps.getProperty("aggregate.output.topic");
    Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
    
    SpecificAvroSerde<ElectronicOrder> electronicSerde = StreamsUtils.getSpecificAvroSerde(configMap);
  2. Create the electronic orders stream:

    KStream<String, ElectronicOrder> electronicStream = 
        builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde))
               .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
  3. Execute a groupByKey followed by aggregate (initialize the aggregator with "0.0," a double value):

    electronicStream.groupByKey().aggregate(() -> 0.0,
  4. Now add the aggregator implementation, which takes each order and adds the price to a running total, a sum of all electronic orders. Also add a Materialized, which is necessary to provide state store SerDes since the value type has changed.

    (key, order, total) -> total + order.getPrice(), Materialized.with(Serdes.String(), Serdes.Double()))

    Call toStream() on the KTable that results from the aggregation operation, add a peek operation to print the results of the aggregation, and then add a .to operator to write the results to a topic:

    .toStream()
    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
  5. Create the KafkaStreams object, and again use the TopicLoader helper class to create topics and produce exercise data:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
    TopicLoader.runProducer();
  6. Finally, start the Kafka Streams application, making sure to let it run for more than 30 seconds:

    kafkaStreams.start();

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.