Course: Kafka Streams 101

Windowing

9 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)
Bill BejeckIntegration Architect (Course Author)

Windowing

Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.

Hopping

Imagine that you're building a stream and you simply need to count your instances by key:

KStream<String, String> myStream = builder.stream("topic-A");
myStream.groupByKey().count().toStream().to("output")

The problem is that, over time, your count is going to continue to grow. So you need to window your results in order to get a sense of how your count fluctuates:

KStream<String, String> myStream = builder.stream("topic-A");
Duration windowSize = Duration.ofMinutes(5);
Duration advanceSize = Duration.ofMinutes(1);
TimeWindows hoppingWindow = TimeWindows.of(windowSize).advanceBy(advanceSize);
myStream.groupByKey()
    .windowedBy(hoppingWindow)
    .count();

A hopping window is bound by time: You define the size of the window, but then it advances in increments smaller than the window size, so you end up with overlapping windows. You might have a window size of 30 seconds with an advance size of five seconds. Data points can belong to more than one window.

Tumbling

A tumbling window is a special type of hopping window:

KStream<String, String> myStream = builder.stream("topic-A");
Duration windowSize = Duration.ofSeconds(30);
TimeWindows tumblingWindow = TimeWindows.of(windowSize);

myStream.groupByKey()
    .windowedBy(tumblingWindow)
    .count();

It's a hopping window with an advance size that's the same as its window size. So basically you just define a window size of 30 seconds. When 30 seconds are up, you get a new window with a time of 30 seconds. So you don't get duplicate results like you do with the overlapping in hopping windows.

Session

Session windows are different from the previous two types because they aren't defined by time, but rather by user activity:

KStream<String, String> myStream = builder.stream("topic-A");
Duration inactivityGap = Duration.ofMinutes(5);

myStream.groupByKey()
    .windowedBy(SessionWindows.with(inactivityGap))
    .count();

So with session windows, you define an inactivityGap. Basically, as long as a record comes in within the inactivityGap, your session keeps growing. So theoretically, if you're keeping track of something and it's a very active key, your session will continue to grow.

Sliding

A sliding window is bound by time, but its endpoints are determined by user activity. So you create your stream and set a maximum time difference between two records that will allow them to be included in the first window.

 KStream<String, String> myStream = builder.stream("topic-A");
 Duration timeDifference = Duration.ofSeconds(2);
 Duration gracePeriod = Duration.ofMillis(500);
 myStream.groupByKey()
    .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(timeDifference, gracePeriod))
    .count();

The window doesn't continuously advance, as with a hopping window, but advances based on user activity.

Grace Periods

With the exception of session windows, which are behavior-driven, all windows have the concept of a grace period. A grace period is an extension to the size of a window. Specifically, it allows events with timestamps greater than the window-end (but less than the window-end plus the grace period) to be included in the windowed calculation.

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.