Principal Software Practice Lead
Real-time data streaming differs from batch-oriented processes by focusing on individual events and consuming them as they happen. However, this creates complications when we perform a calculation that operates over many events rather than just one. In that case, streaming tools group events into time-based windows. However, even though the windows contain multiple events, they are still processed one at a time. It is the result of the aggregation that gets rolled up into the window.
The aggregations supported by the Flink Table API have optimizations that allow them to accumulate results with a minimal amount of state. An average aggregation normally requires keeping track of each value. However, it can be optimized to track only the sum of the values and the total number of records. This reduces the large list to just two individual numbers.
The Table API also supports different types of windows, such as tumbling and sliding. These are similar but differ in key ways. Tumbling windows are equally spaced with zero overlap, whereas sliding windows, though still equally spaced, usually have some overlap. Where each of them is used depends on the specific use case.
In this video, we'll show how to implement windows using the Flink Table API. We'll break down the problems windows are trying to solve and show how the different window types approach the solution.
An example of a Tumbling window that breaks the stream into one hour windows, grouped by customer_id and then calculating the sum of the price field for each group in the window.
env.from("examples.marketplace.orders")
.window(
Tumble.over(
lit(1).hours()
)
.on(
$("$rowtime")
)
.as("window")
)
.groupBy(
$("customer_id"),
$("window")
)
.select(
$("customer_id"),
$("window").start().as("window_start"),
$("window").end().as("window_end"),
$("price").sum().as("total_spend")
)
.execute();
An example of a sliding window that separates the stream into 1 hour windows that are reported every 30 minutes. This means each window overlaps with the previous one by 30 minutes. The data is then grouped by customer_id and aggregations are calculated over the group.
env.from("examples.marketplace.orders")
.window(
Slide.over(
lit(1).hour()
)
.every(
lit(30).minutes()
)
.on(
$("$rowtime")
)
.as("window")
)
.groupBy(
$("customer_id"),
$("window")
)
.select(
$("customer_id"),
$("window").start().as("window_start"),
$("window").end().as("window_end"),
$("price").sum().as("total_spend")
)
.execute();
A critical aspect of any business is its ability to aggregate key metrics to produce new insights.
Metrics such as how many products a company has sold are rolled up over an hour, a day, a month, or a year, and then used to identify trends and predict future growth.
However, the modern world of data streaming complicates this.
Aggregation requires dealing with events in large groups, but data streaming is focused on handling individual events as they arrive.
Or is it?
One of the primary uses for data streaming tools such as Apache Flink, is finding ways to partition a stream into discrete time windows.
These windows are used to perform aggregate calculations to derive the insights we are looking for.
Imagine a business that collects details about product sales.
They keep the data in a static database table that includes the customer_id, product_id, price, and timestamp.
If we wanted to aggregate the data every hour, it's easy to separate the table into discrete batches, based on the time.
Then, we can compute an aggregate, such as the sum of the price, for each of the batches.
However, despite being simple, it's also expensive.
All of the calculations must be performed on a set schedule.
Some of the time, the computation might be idle, which means allocated resources are potentially being wasted.
Then, once the required interval has elapsed, the calculation is initiated and suddenly, we have a hotspot.
Meanwhile, what happens if the computation fails?
In that case, it has to be retried, further increasing the computational burden.
These hotspots and failures can lead to excessive use of resources and limit the ability to provide real-time insights.
This is one of the reasons many businesses reach for data streaming as a potential solution.
The advantage to data streaming is that
it allows records to be processed in real time.
This allows computation to be spread out over a longer period,
which can eliminate the hotspots.
In the event of a failure,
we are left retrying just the individual event,
rather than being forced to retry an entire batch.
And most importantly, it allows us to derive insights in real time as events flow through the system.
The challenge is that dealing with events one at a time doesn't lend itself to aggregations that require looking at multiple events.
To resolve this, Flink introduces the window API.
The window API allows Flink to group events into time-based windows.
As events flow through the system,
Flink can observe the supplied timestamp,
and add the event to one of the windows.
Then, once the window is complete, an aggregated result that covers the entire window can be computed.
At this point, Flink can emit that result as a new event.
However, it's important to note that even though we apply the aggregations to the entire window, the events are still processed one at a time.
Flink will maintain the intermediate state necessary to produce the final aggregation.
Furthermore, it applies optimizations where appropriate to ensure that it only keeps the minimum amount of state necessary.
As an example, to compute an average, you need to sum the individual values, and then divide by the number of records.
But, that doesn't mean you have to keep all of the values.
Keeping a running sum and a running count, basically two numbers, is sufficient to compute the final average.
So the actual state being maintained can be quite small in many cases.
With all that in mind, let's take a look at some basic windows.
The windows we look at today all consist of a few basic components.
They have a window clause, which defines the size and shape of the windows.
The groupBy clause organizes the individual records into groups according to the window definition and any other needed parameters.
And finally, the select clause defines the shape of the results, including which aggregations should be performed on the window.
Take a look at this timeline.
Each envelope represents a message arriving in the system at a specific time.
We could divide this timeline into a series of hourly windows.
Each window is one hour long and there is no overlap between them.
These are what we call tumbling windows, and they represent one of the simplest types.
Defining a tumbling window is relatively simple.
The Tumble.over command just requires a duration for the window.
In this case, we've specified a tumbling window of one hour.
However, we also need to tell it what field it should use when looking for the timestamp.
We do that using the on method in the window.
Here, we are going to use the built-in $rowtime field.
Finally, we need to give our window a name.
We can do that using the as method.
I've chosen to name it window just to make it really clear what this represents, but the name can be anything.
Next, we need to tell the query how to group records using the groupBy clause.
Because this is a windowed query, we are going to group records into the windows.
As a result, I have specified the window name.
And finally, we need to tell the query what to do with the results.
In the select clause, I am reporting the window start and end times.
This will allow me to see exactly when the results occurred.
Then, I perform one of my aggregations, in this case, I sum the price of all orders to determine the total amount spent during the time window.
If we wanted to get more specific and see how much each individual customer spent during the time window, we could do that, by including the customer_id in the groupBy and also in the select statement.
This allows us to get more granular with how we report the data.
There are a few things we need to remember.
First, we can get quite detailed with our queries by including additional fields in the groupBy clause.
However, because this is a windowed operation, we must ensure that the window is included, or it won't work.
Second, the fields in our select statement will be limited to those listed in our groupBy clause or aggregations that apply to the whole window.
Other fields won't be available because they don't make sense.
We can't ask for the customer_id if the records aggregate multiple customers.
If we want to access a field like that, we have to include that field in the groupBy clause.
Now, let's look at another type of window.
Let's return to our timeline of events.
We can define a fixed-size window of one hour, just like we did with our tumbling windows.
But this time, we'll only draw one.
And, like before, we can perform an aggregation over this window and emit the results.
However, to define the next window, rather than advancing to the end of the current one, we will slide over by 30 minutes.
The window is still an hour long, but now it overlaps with the previous one.
We can keep going like this, advancing the hour long window by thirty minutes at a time.
This is known as a sliding window in the Flink Table API, however, it gets translated into a hopping window in Flink SQL.
You can ignore the slight difference in terminology since Flink treats them the same way.
In our tumbling window example, we emitted an hour's worth of data every hour.
With a sliding window, we emit an hour's worth of data every half hour, hence the overlap.
Now, let's look at how we implement it.
For a sliding window, we define the length of the window the same way we did with a tumbling window, simply replacing the word Tumble with Slide.
Here, we've selected a sliding window that will be an hour long.
The biggest difference is that we also have to provide a second duration indicating how often we want to emit our results.
In this case, we're going to emit our results every 30 minutes
One condition is that the window size specified in the over method must be an integral multiple of the slide indicated in the every method.
The rest of the example proceeds exactly as it did with the tumbling window.
We provide the field to look for the timestamp, and a name for the window.
Then, we specify our groupBy parameters, making sure to include the window.
And finally, we select the fields we are interested in and the aggregations we want to perform.
Modern businesses are driven by the type of metrics we generate using windows.
They allow companies to derive deeper and more valuable insights than they have ever had access to before.
And, with Apache Flink, they can do it all in real-time.
Feel free to drop me a comment if you have any questions about windows in Flink.
And keep an eye out for future videos covering similar topics.
Thanks for watching.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.