Course: Kafka Streams 101

Hands On: Windowing

2 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)
Bill BejeckIntegration Architect (Course Author)

Hands On: Windowing

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/windows/StreamsWindows.java

In this hands-on exercise, you will write a windowed aggregation.

  1. Use the following code as a starter for your windowing code:

    public class StreamsWindows {
        public static void main(String[] args) throws IOException {
            Properties streamsProps = StreamsUtils.loadProperties();
            streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed-streams");
            StreamsBuilder builder = new StreamsBuilder();
            String inputTopic = streamsProps.getProperty("windowed.input.topic");
            String outputTopic = streamsProps.getProperty("windowed.output.topic");
            Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
            SpecificAvroSerde<ElectronicOrder> electronicSerde = StreamsUtils.getSpecificAvroSerde(configMap);
  2. Use builder.stream to create a KStream named electronicStream, with a peek operator to observe incoming events:

    KStream<String, ElectronicOrder> electronicStream =
        builder.stream(inputTopic, Consumed.with(Serdes.String(), electronicSerde))
               .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
  3. Perform a groupByKey, and add a one-hour tumbling window with a grace period of five minutes:

    electronicStream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(5)))
  4. Create the aggregation, initialize it to zero, and then add the aggregator implementation, which calculates a running sum of electronic purchase events.

     .aggregate(() -> 0.0,
                    (key, order, total) -> total + order.getPrice(),
  5. Add SerDes for the types in the aggregation, which are used by the state store. Then, add a suppress operator in order to not emit any updates until the window closes:

    Materialized.with(Serdes.String(), Serdes.Double()))
    .suppress(untilWindowCloses(unbounded()))

    This gives a single result for the windowed aggregation. (The unbounded parameter means that the buffer will continue to consume memory as needed until the window closes.) Suppression is optional, particularly if you want to see any intermediate results.

  6. Convert the KTable to a KStream, map the windowed key to an underlying record key, add a peek operator to view the underlying result, and add a to operator to write the results to a topic:

    .toStream()
    .map((wk, value) -> KeyValue.pair(wk.key(),value))
    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));

Let the code run for a minimum of 40 seconds to see the final results for the different keys. Note that the example uses simulated timestamps for the different keys, so the results only approximate what you would see in production.

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.