Get Started Free
Tutorial

How to aggregate over session windows with Kafka Streams

How to aggregate over session windows with Kafka Streams

If you have time series events in a Kafka topic, session windows let you group and aggregate them into variable-size, non-overlapping time intervals based on a configurable inactivity period.

For example, suppose that you have a topic with events that represent website clicks. The following topology definition counts the number of clicks per source IP address for windows that close after 5 minutes of inactivity.

    builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), clickSerde))
            .groupByKey()
            .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30)))
            .count()
            .toStream()
            .map((windowedKey, count) ->  {
                String start = timeFormatter.format(windowedKey.window().startTime());
                String end = timeFormatter.format(windowedKey.window().endTime());
                String sessionInfo = String.format("Session info started: %s ended: %s with count %s", start, end, count);
                return KeyValue.pair(windowedKey.key(), sessionInfo);
            })
            .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

Let's review the key points in this example.

    .groupByKey()

Aggregations must group records by key. By not passing an argument, we use the current key (the source IP address).

    .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30)))

This creates a new SessionWindowedKStream over which we can aggregate. The session windows close after 5 minutes of inactivity, and we allow data to arrive late by as much as 30 seconds.

    .count()

The count() operator is a convenience aggregation method. Under the covers it works like any other aggregation in Kafka Streams — i.e., it requires an Initializer, Aggregator and a Materialized to set the Serde for the value since it's a long. But, since the result of this aggregation is a simple count, Kafka Streams handles those details for you.

    .toStream()
    .map((windowedKey, count) ->  {
        String start = timeFormatter.format(windowedKey.window().startTime());
        String end = timeFormatter.format(windowedKey.window().endTime());
        String sessionInfo = String.format("Session info started: %s ended: %s with count %s", start, end, count);
        return KeyValue.pair(windowedKey.key(), sessionInfo);
    })
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

Aggregations in Kafka Streams return a KTable instance, so it's converted to a KStream. Then map converts to the expected data types. The value is a formatted String containing session information.