Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/windows/StreamsWindows.java
In this hands-on exercise, you will write a windowed aggregation. Note that the application you'll build uses the default timestamp extractor FailOnInvalidTimestamp.
Use the following code as a starter for your windowing code:
public class StreamsWindows {
public static void main(String[] args) throws IOException {
Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-streams");
StreamsBuilder builder = new StreamsBuilder();
String inputTopic = streamsProps.getProperty("windowed.input.topic");
String outputTopic = streamsProps.getProperty("windowed.output.topic");
Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
SpecificAvroSerde<ElectronicOrder> electronicSerde = StreamsUtils.getSpecificAvroSerde(configMap);
Use builder.stream to create a KStream named electronicStream, with a peek operator to observe incoming events:
KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
Perform a groupByKey, and add a one-hour tumbling window with a grace period of five minutes:
electronicStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(5)))
Create the aggregation, initialize it to zero, and then add the aggregator implementation, which calculates a running sum of electronic purchase events.
.aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Add SerDes for the types in the aggregation, which are used by the state store. Then, add a suppress operator in order to not emit any updates until the window closes:
Materialized.with(Serdes.String(), Serdes.Double()))
.suppress(untilWindowCloses(unbounded()))
This gives a single result for the windowed aggregation. (The unbounded parameter means that the buffer will continue to consume memory as needed until the window closes.) Suppression is optional, particularly if you want to see any intermediate results.
Convert the KTable to a KStream, map the windowed key to an underlying record key, add a peek operator to view the underlying result, and add a to operator to write the results to a topic:
.toStream()
.map((wk, value) -> KeyValue.pair(wk.key(),value))
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
To run the windowing example use this command:
./gradlew runStreams -Pargs=windows
Your output should look something like this:
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622152480629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622153380629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622154280629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622155180629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622156260629}
Incoming record - key SUPER-WIDE-TV-2333 value {"order_id": "instore-1", "electronic_id": "SUPER-WIDE-TV-2333", "user_id": "1038884844", "price": 5333.98, "time": 1622156260629}
Incoming record - key SUPER-WIDE-TV-2333 value {"order_id": "instore-1", "electronic_id": "SUPER-WIDE-TV-2333", "user_id": "1038884844", "price": 4333.98, "time": 1622158960629}
Outgoing record - key HDTV-2333 value 2000.0
Outgoing record - key HDTV-2333 value 9167.189999999999
Outgoing record - key SUPER-WIDE-TV-2333 value 5333.98
Let the code run for a minimum of 40 seconds to see the final results for the different keys.
Note that the example uses simulated timestamps for the different keys, so the results only approximate what you would see in production.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
For this exercise you're going to build a windowed aggregation. All the required parts of the code have been added. Your focus is writing the windowed aggregation. To start, create the K stream electronic stream with the stream builder. You'll also add a peek operator to observe the incoming events. The first step for a windowed aggregations to do a group by key. Next, you'll out a one hour tumbling window with a grace period of five minutes. Grace period means that events with timestamps outside the one hour window by no more than five minutes are still included in the computation. Now create the aggregation and initialize it to zero. Next, add the aggregator implementation which calculates a running sum of electronic purchase events. Then you'll need to add SerDes for the types of the aggregation. These are used by the state store. Next, you'll add a key part of this process. Suppressing all updates until the window closes. This results in a single final result for the windowed aggregation. The unbounded perimeter means that the buffer will continue to consume memory as it's needed until the window closes. Suppression is optional, especially if you want to see any intermediate results. After this, convert the KTable to a KStream. And you also map the windowed key to the underlying record key. Finally, add a peek to view the final result and a 'to' operator to write the results to a topic. Let this application run for a minimum of 40 seconds to see the final results for the different keys. Note, the example uses simulated timestamps. So the results are approximate as to what you'd see production.