Software Practice Lead
Flink SQL uses window table-valued functions to support aggregations over time windows. These functions support several types of windows:
Session windows group events into sessions separated by gaps where the stream is idle for some minimum interval. The other window types all group events into windows of a fixed size relative to the time of day.
For windows relative to each event (rather than the time of day), see the module on OVER windows.
For a guided exploration into how both window and over aggregations work, see the accompanying exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
This video on windowing is part of a series of videos about Apache Flink and Flink SQL.
Windowed analytics are broadly useful in many different applications. For example, windowed analytics are widely used for reporting and monitoring, and also in building features for machine learning models.
I am going to present several examples, all based on this simple clickstream. Each click has a URL and a click time.
The watermarking specification for this table tells Flink that the events will arrive in order according to their timestamps.
This is the kind of result I'm looking for. I want to know second by second how many clicks occurred, and this is the query that will do this.
Let's dive in and see what's going on with this query.
This query is doing a grouped aggregation over a table produced by this tumble function.
Tumble is a table-valued function, meaning it's a function that returns a table.
This tumble function takes three parameters that are named data, timecol, and size.
The data parameter is where we specify which table is going to have windowing applied to it.
The time column is where we specify the time attribute of the clicks table.
It's possible for a table to have many different timestamp columns, but only one of them will be used for watermarking. We call that special timestamp column the time attribute.
In this case, the time attribute for the clicks table is the click_time column.
Now for a quick aside about confluent cloud for Apache Flink.
On confluent Cloud, it's not necessary to have an explicit timestamp column; you can instead use the kafka record timestamp, which is exposed as a special column named $rowtime.
Confluent Cloud provides automatic watermarking for this $rowtime column, which is why in this example the clicks table doesn't have any watermarks defined.
The final parameter for the tumble function is the size. This is where you specify how long each window should be.
The behavior of tumbling windows is that they follow one after another, counting the events within each timeframe.
With tumbling windows, there's no overlap or gaps between the windows.
Now that we've taken a look at the tumble function, let's try to understand its role in this query by directly examining what the tumble function is returning.
When we select * from the table returned by the tumble function, we get this table. The first two columns are copied from the clicks table, but now three additional columns have been added: window_start, window_end, and window_time.
The effect of the tumble function is to assign each row of the clicks table to the appropriate window as defined by window_start and window_end.
When we then group by window_start and window_end, we're mapping each click onto the appropriate window.
Now when we count the clicks in each group, we are counting the clicks in each window, which is exactly what we wanted.
There's one more column in the table returned by the tumble function, and you may be wondering what that's all about.
The window_start and window_end columns are ordinary timestamp columns, but the window_time column is a time attribute.
This means that this results table has watermarks based on the window_time column, which allows you to take the result of the tumble function and use it in other operations that require watermarks.
Even though in this case, we're only interested in reporting the window_start time, it’s very important that we group by both window_start and window_end.
The Flink SQL planner recognizes this pattern and uses an optimized window execution operator to handle this case.
The result is an insert-only stream or table where each entry is the final result for that period of time.
After producing each result, the storage used to accumulate that result is freed.
On the other hand, if we had only grouped by the window_start field, this query would be planned using a generic grouped aggregation. The result is a table that includes updates.
As you see here, the first event causes the counter to be set to one and the second event updates that one to a two.
With this query plan, the storage for each window is retained forever.
Flink SQL doesn't just have tumble Windows; we're now going to look at three more flavors of table-valued functions, all of which use the same extra columns — namely, window_start, window_end, and window_time, just the same as the tumble function.
First up is hopping windows.
In addition to a size, hopping windows also specify a slide. In this case, we're asking for windows that are always one second long, but with a report coming every half a second.
Here's the first window.
Then we slide forward by half a second, and so on.
The next flavor of window table-valued function is called cumulate.
Once again, these cumulate windows are one second long, but this time the window will report its progress every quarter second until reaching the full window size.
Here is the first step, reporting on just a quarter of a second.
For the next report, the window has stretched itself out to fill half a second,
Then the window steps forward a bit further until the second is finally complete.
After completing one second, we begin to step through the following second.
Here is a special case we need to talk about. This window is empty, and you might expect it would produce a result of zero.
However, in Flink, empty windows simply don't exist and produce no results at all.
The final flavor of window table-valued functions is for session windows.
Session windows work a bit different differently than the others we have just seen.
A session is characterized by the gap between sessions.
With a gap of 10 minutes, here's how these events would be grouped into sessions.
The event stream is being separated into different sessions wherever the gap between successive events is longer than 10 minutes. Within each session, the events are less than 10 minutes apart.
If the gap were much longer, such as an hour, instead of belonging to three separate sessions, all of these events would be assigned to the same session.
In addition to the basic types of windows I’ve presented here, Flink SQL also includes support for over windows, windowed joins, and pattern matching within time intervals.
For more information and hands-on exercises, head on over to Confluent Developer using the link in the description below.