Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build
and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/windows/StreamsWindows.java
In this hands-on exercise, you will write a windowed aggregation. Note that the application you'll build uses the default timestamp extractor FailOnInvalidTimestamp.
Use the following code as a starter for your windowing code:
public class StreamsWindows {
public static void main(String[] args) throws IOException {
Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-streams");
StreamsBuilder builder = new StreamsBuilder();
String inputTopic = streamsProps.getProperty("windowed.input.topic");
String outputTopic = streamsProps.getProperty("windowed.output.topic");
Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
SpecificAvroSerde<ElectronicOrder> electronicSerde = StreamsUtils.getSpecificAvroSerde(configMap);
Use builder.stream
to create a KStream
named electronicStream
, with a peek
operator to observe incoming events:
KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
Perform a groupByKey
, and add a one-hour tumbling window with a grace period of five minutes:
electronicStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(5)))
Create the aggregation, initialize it to zero, and then add the aggregator implementation, which calculates a running sum of electronic purchase events.
.aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Add SerDes for the types in the aggregation, which are used by the state store. Then, add a suppress operator in order to not emit any updates until the window closes:
Materialized.with(Serdes.String(), Serdes.Double()))
.suppress(untilWindowCloses(unbounded()))
This gives a single result for the windowed aggregation. (The unbounded
parameter means that the buffer will continue to consume memory as needed until the window closes.) Suppression is optional, particularly if you want to see any intermediate results.
Convert the KTable
to a KStream
, map
the windowed key to an underlying record key, add a peek
operator to view the underlying result, and add a to
operator to write the results to a topic:
.toStream()
.map((wk, value) -> KeyValue.pair(wk.key(),value))
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
To run the windowing example use this command:
./gradlew runStreams -Pargs=windows
Your output should look something like this:
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622152480629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622153380629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622154280629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622155180629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622156260629}
Incoming record - key SUPER-WIDE-TV-2333 value {"order_id": "instore-1", "electronic_id": "SUPER-WIDE-TV-2333", "user_id": "1038884844", "price": 5333.98, "time": 1622156260629}
Incoming record - key SUPER-WIDE-TV-2333 value {"order_id": "instore-1", "electronic_id": "SUPER-WIDE-TV-2333", "user_id": "1038884844", "price": 4333.98, "time": 1622158960629}
Outgoing record - key HDTV-2333 value 2000.0
Outgoing record - key HDTV-2333 value 9167.189999999999
Outgoing record - key SUPER-WIDE-TV-2333 value 5333.98
Let the code run for a minimum of 40 seconds to see the final results for the different keys.
Note that the example uses simulated timestamps for the different keys, so the results only approximate what you would see in production.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.