Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Windowing

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/windows/StreamsWindows.java

In this hands-on exercise, you will write a windowed aggregation. Note that the application you'll build uses the default timestamp extractor FailOnInvalidTimestamp.

  1. Use the following code as a starter for your windowing code:

    public class StreamsWindows {
        public static void main(String[] args) throws IOException {
            Properties streamsProps = StreamsUtils.loadProperties();
            streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-streams");
    
            StreamsBuilder builder = new StreamsBuilder();
            String inputTopic = streamsProps.getProperty("windowed.input.topic");
            String outputTopic = streamsProps.getProperty("windowed.output.topic");
            Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
            SpecificAvroSerde<ElectronicOrder> electronicSerde = StreamsUtils.getSpecificAvroSerde(configMap);
  2. Use builder.stream to create a KStream named electronicStream, with a peek operator to observe incoming events:

    KStream<String, ElectronicOrder> electronicStream =
        builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde))
               .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
  3. Perform a groupByKey, and add a one-hour tumbling window with a grace period of five minutes:

    electronicStream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(5)))
  4. Create the aggregation, initialize it to zero, and then add the aggregator implementation, which calculates a running sum of electronic purchase events.

     .aggregate(() -> 0.0,
                    (key, order, total) -> total + order.getPrice(),
  5. Add SerDes for the types in the aggregation, which are used by the state store. Then, add a suppress operator in order to not emit any updates until the window closes:

    Materialized.with(Serdes.String(), Serdes.Double()))
    .suppress(untilWindowCloses(unbounded()))

    This gives a single result for the windowed aggregation. (The unbounded parameter means that the buffer will continue to consume memory as needed until the window closes.) Suppression is optional, particularly if you want to see any intermediate results.

  6. Convert the KTable to a KStream, map the windowed key to an underlying record key, add a peek operator to view the underlying result, and add a to operator to write the results to a topic:

    .toStream()
    .map((wk, value) -> KeyValue.pair(wk.key(),value))
    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));

To run the windowing example use this command:

./gradlew runStreams -Pargs=windows

Your output should look something like this:

Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622152480629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622153380629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622154280629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622155180629}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622156260629}
Incoming record - key SUPER-WIDE-TV-2333 value {"order_id": "instore-1", "electronic_id": "SUPER-WIDE-TV-2333", "user_id": "1038884844", "price": 5333.98, "time": 1622156260629}
Incoming record - key SUPER-WIDE-TV-2333 value {"order_id": "instore-1", "electronic_id": "SUPER-WIDE-TV-2333", "user_id": "1038884844", "price": 4333.98, "time": 1622158960629}
Outgoing record - key HDTV-2333 value 2000.0
Outgoing record - key HDTV-2333 value 9167.189999999999
Outgoing record - key SUPER-WIDE-TV-2333 value 5333.98

Let the code run for a minimum of 40 seconds to see the final results for the different keys.

Note that the example uses simulated timestamps for the different keys, so the results only approximate what you would see in production.

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.