If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with
./gradlew build and follow along in the code. This module’s code can be found in the source file
This hands-on exercise demonstrates stateful operations in Kafka Streams, specifically aggregation, using a simulated stream of electronic purchases. You'll see the incoming records on the console along with the aggregation results.
To begin, let’s extract the names of the topics from the configuration, which we’ve already loaded via a static helper method. Then we’ll convert the properties to a HashMap and use another utility method to create the specific record
String inputTopic = streamsProps.getProperty("aggregate.input.topic"); String outputTopic = streamsProps.getProperty("aggregate.output.topic"); Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps); SpecificAvroSerde<ElectronicOrder> electronicSerde = StreamsUtils.getSpecificAvroSerde(configMap);
Create the electronic orders stream:
KStream<String, ElectronicOrder> electronicStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde)) .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
groupByKey followed by
aggregate (initialize the aggregator with "0.0," a double value):
electronicStream.groupByKey().aggregate(() -> 0.0,
Now add the aggregator implementation, which takes each order and adds the price to a running
total, a sum of all electronic orders. Also add a
Materialized, which is necessary to provide state store SerDes since the value type has changed.
(key, order, total) -> total + order.getPrice(), Materialized.with(Serdes.String(), Serdes.Double()))
toStream() on the KTable that results from the aggregation operation, add a
peek operation to print the results of the aggregation, and then add a
.to operator to write the results to a topic:
.toStream() .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value)) .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
KafkaStreams object, and again use the
TopicLoader helper class to create topics and produce exercise data:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps); TopicLoader.runProducer();
Finally, start the Kafka Streams application, making sure to let it run for more than 30 seconds:
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.