Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Windowing

Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.

Hopping

Imagine that you're building a stream and you simply need to count your instances by key:

KStream<String, String> myStream = builder.stream("topic-A");
myStream.groupByKey().count().toStream().to("output")

The problem is that, over time, your count is going to continue to grow. So you need to window your results in order to get a sense of how your count fluctuates:

KStream<String, String> myStream = builder.stream("topic-A");
Duration windowSize = Duration.ofMinutes(5);
Duration advanceSize = Duration.ofMinutes(1);
TimeWindows hoppingWindow = TimeWindows.of(windowSize).advanceBy(advanceSize);
myStream.groupByKey()
    .windowedBy(hoppingWindow)
    .count();

A hopping window is bound by time: You define the size of the window, but then it advances in increments smaller than the window size, so you end up with overlapping windows. You might have a window size of 30 seconds with an advance size of five seconds. Data points can belong to more than one window.

Tumbling

A tumbling window is a special type of hopping window:

KStream<String, String> myStream = builder.stream("topic-A");
Duration windowSize = Duration.ofSeconds(30);
TimeWindows tumblingWindow = TimeWindows.of(windowSize);

myStream.groupByKey()
    .windowedBy(tumblingWindow)
    .count();

It's a hopping window with an advance size that's the same as its window size. So basically you just define a window size of 30 seconds. When 30 seconds are up, you get a new window with a time of 30 seconds. So you don't get duplicate results like you do with the overlapping in hopping windows.

Session

Session windows are different from the previous two types because they aren't defined by time, but rather by user activity:

KStream<String, String> myStream = builder.stream("topic-A");
Duration inactivityGap = Duration.ofMinutes(5);

myStream.groupByKey()
    .windowedBy(SessionWindows.with(inactivityGap))
    .count();

So with session windows, you define an inactivityGap. Basically, as long as a record comes in within the inactivityGap, your session keeps growing. So theoretically, if you're keeping track of something and it's a very active key, your session will continue to grow.

Sliding

A sliding window is bound by time, but its endpoints are determined by user activity. So you create your stream and set a maximum time difference between two records that will allow them to be included in the first window.

 KStream<String, String> myStream = builder.stream("topic-A");
 Duration timeDifference = Duration.ofSeconds(2);
 Duration gracePeriod = Duration.ofMillis(500);
 myStream.groupByKey()
    .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(timeDifference, gracePeriod))
    .count();

The window doesn't continuously advance, as with a hopping window, but advances based on user activity.

Grace Periods

With the exception of session windows, which are behavior-driven, all windows have the concept of a grace period. A grace period is an extension to the size of a window. Specifically, it allows events with timestamps greater than the window-end (but less than the window-end plus the grace period) to be included in the windowed calculation.

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Windowing

Hi, I'm Sophie Blee-Goldman with Confluent. In this module, we're going to talk about windowing with Kafka Streams. Windowing allows you to bucket your aggregations by time. Without windowing, your aggregations will continue to build up more and more over time, so any new result will reflect the entire history of that topic. With windowing, you can get a snapshot of that aggregation over a given timeframe. There's four types of windows that Kafka Streams provides, and we will discuss them in this module. The tumbling window, hopping window, session window and the sliding window. To get some context, let's take an example. So here you're building a stream and you want to count the number of events with a certain key. So, you're just going to call groupByKey like you did for any of the stateful operations, and then this count operator, which just keeps track of the number of records that you've seen for a given key so far. Simple aggregation like this is going to keep growing up and up and up as it contains the history of this entire topic. But sometimes, you don't really care about the entire history of this topic and how many keys you see in over its lifetime, you just want to get the count over a certain window. So for example, we wanna know when was the count higher, when was it lower? This allows you to do more interesting time-based analytics. So the first type of window that we'll go over is the hopping window. Hopping window is a window that is bound by time, has a fixed start and end point, and a fixed size. So to use the hopping window, you need to define the window size and the advanced size. The window size just determines literally the duration of this window and what events will fall within this window. And the advanced size determines how this window advances. So for each one minute, in this case, the five minute window will advance one minute into the future, so you might have a window that goes from zero to five minutes and a window that goes from one to six minutes and so on. So, given that you might have multiple windows, as long as the advanced size is less than the window size, you can actually end up with duplicates. For example, if you have this zero to five minute and one to six minute window and event coming in at minute two is going to fall into both of these windows. So in this case, the results are not really unique and you can get duplicates in that way. Now to define a hopping window in Kafka streams, you first need to define this window size at advanced size duration, and then specify it with this time windows. All you need to do here is construct the time windows using the of constructor passing the window size and then pass in the advanced size, using the advanced by method. So then to do a windowed count, you take your stream and call group by key like you did before and now, instead of calling the count operator, you first call windowed by. Now windowed by just takes this time windows that you defined, in this case, the hopping window, and then tells Kafka streams to do a windowed count. Only events that fall within the given window will be counted for that windows result. And you will get the result for each of the windows that are defined by these hopping windows. So next up is the tumbling window. The tumbling window is actually a special type of the hopping window where the advanced by is exactly equal to the window size. So in this case, you don't get any duplicate or overlapping windows. Each window is just tumbling over in time, for example, you might have one from zero to 30 seconds and then another window from 30 to one minute, but never any overlapping windows and in this case, all of your results are going to be unique. Now to use a tumbling window, you use it pretty much the same way that you would a hopping window. The only difference being that you only need to pass in the window size and you do not need to call this advanced by since the window size is the same as the advanced by, Kafka streams will know that this means you want a tumbling window, and from there, you just pass it into windowed by and do your window count operation. Now, especially windows are a bit of a different beast, they're not time-driven the way that the hopping and the tumbling window are. They have a window start and the window end, but those are not fixed the way they are for the other window types. And they don't have a fixed size for that reason. So in a session window, the window boundaries are actually determined by the events themselves. So a session window will continue to grow as long as new events come in, as new events come in, if there is an existing session window within the inactivity gap, then this new event will be merged with the existing session window. The inactivity gap is what defines the session window. Similarly to how the size might define a tumbling window. Again, you define your stream as always and group by the key and then instead of a time windows, you pass it into session windows with the inactivity gap, and that will give you a session windowed count. So a session window is useful for something like user browser sessions, where a user might log into a site leave for a little bit and then come back and you really wanna know what was that entire session and combine them together into a single session and not count the five minutes they got up to go get a snack. Now, lastly, we have the sliding window, a sliding window is similar to a hopping and a tumbling window. And that it has a fixed size. It has fixed end points, but instead of being driven by time itself, the sliding window is driven by the actual events. So a sliding window is actually similar to a hopping window with an advanced time of one or whatever the smallest granularity is it results in all possible aggregation results. So you would get every possible unique count for any combination of events within that duration that is the time difference, similar to the window size for the hopping window. So unlike a hopping window where you might have many records and many results that are the same due to the events falling within that window, being identical, a sliding window is only going to give you one output result for each possible combination of these events. So in this way, instead of being driven just by the advancement of time, sliding windows are really driven by the advancement of the events themselves. You can think of it as kind of an admit on change semantics, where the window result changes only when a new event comes in or an old event leaves the window as time has advanced. This makes them more efficient for computing aggregations, where you wanna get every possible value of something over a specified window or time duration. For example, you might wanna know if the average temperature was ever exceeded over some time interval in your factory and go address the problem assuming there might be one. A sliding window aggregation is defined similarly to the other ones. You first take your stream, group by key and again call windowed by. Now this time you pass in a sliding windows, just like you did a time windows or a session windows before. And the sliding windows is actually created with a time difference and grace. So here, the time difference is somewhat analogous to the window size previously. It defines the maximum amount of time. Two events can be separated for them to be still considered within the same window. And then you also need to define this thing called the grace period, which we will get into now. So all windowed operations in Kafka Streams have this concept of a grace period. So all windows have an start time and an end time and as new events come in, if they fall within these two times, the event will be aggregated with the current result in this window and the window will be updated. But what happens if you receive an event that has a timestamp after this window has already ended. Well, records with timestamps outside the window are discarded, but sometimes you may have out of order records that you still care about. This is where the grace period comes in. So Kafka streams lets you define a grace period on any of these window aggregations. And this just means the amount of time that you're willing to accept new records that come in after the window has already ended. So, now that we've seen some of these windowed operations, let's do an exercise to go over them.