Get Started Free
‹ Back to courses
course: Kafka Streams 101

Hands On: Aggregations

2 min
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

Hands On: Aggregations

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/aggregate/StreamsAggregate.java.

This hands-on exercise demonstrates stateful operations in Kafka Streams, specifically aggregation, using a simulated stream of electronic purchases. You'll see the incoming records on the console along with the aggregation results.

  1. To begin, let’s extract the names of the topics from the configuration, which we’ve already loaded via a static helper method. Then we’ll convert the properties to a HashMap and use another utility method to create the specific record AvroSerde.

    String inputTopic = streamsProps.getProperty("aggregate.input.topic");
    String outputTopic = streamsProps.getProperty("aggregate.output.topic");
    Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
    
    SpecificAvroSerde<ElectronicOrder> electronicSerde = StreamsUtils.getSpecificAvroSerde(configMap);
  2. Create the electronic orders stream:

    KStream<String, ElectronicOrder> electronicStream = 
        builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde))
               .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
  3. Execute a groupByKey followed by aggregate (initialize the aggregator with "0.0," a double value):

    electronicStream.groupByKey().aggregate(() -> 0.0,
  4. Now add the aggregator implementation, which takes each order and adds the price to a running total, a sum of all electronic orders. Also add a Materialized, which is necessary to provide state store SerDes since the value type has changed.

    (key, order, total) -> total + order.getPrice(), Materialized.with(Serdes.String(), Serdes.Double()))

    Call toStream() on the KTable that results from the aggregation operation, add a peek operation to print the results of the aggregation, and then add a .to operator to write the results to a topic:

    .toStream()
    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
  5. Create the KafkaStreams object, and again use the TopicLoader helper class to create topics and produce exercise data:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
    TopicLoader.runProducer();
  6. Finally, start the Kafka Streams application, making sure to let it run for more than 30 seconds:

    kafkaStreams.start();

To run the aggregation example use this command:

./gradlew runStreams -Pargs=aggregate

You'll see the incoming records on the console along with the aggregation results:

Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622149038018}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622149048018}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622149058018}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622149070018}
Outgoing record - key HDTV-2333 value 9833.21

NOTE that you'll need to let the application run for ~40 seconds to see the aggregation result.

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Hands On: Aggregations

This exercise is all about Streams Stateful Operations, specifically, Aggregation. To begin, we'll use utility methods for loading properties in getting specific record Avro serdes. First, you'll create the ElectronicOrder Stream. The next step is to execute a groupByKey followed by aggregate . They'll be initializing the aggregator with a 0.0, a double value. Now, at the aggregator implementation next. Taking each order and adding the price to a running total, a sum of all electronic orders. Now, add a Materialized in a Serde to provide state store Serdes as the value type has changed. Next, call toStream on the KTable, resulting from the aggregation operation, and add a peek operation to print the results of the aggregation. Then, you'll want to add a to operator to write results to a topic. Make sure to let the application run for 30 plus seconds.