Learn about setting an idle timeout on a watermark generator when joining data in Apache Flink®.
Senior Curriculum Developer
This video covers setting an idle timeout on a watermark generator when joining data in Apache Flink®. This can be used when you have two streams, one that has frequent updates, and one that has infrequent updates, and you need to join data without waiting for a fresh watermark from the infrequent one.
Intro
Hi, I’m Dan Weston, in this Apache Flink in Action we’ll be looking at a common scenario:
Why do we need to set an Idle timeout?
you’re working with data from an e-commerce shop, and you want to enrich each event from
the clickstream with some metadata about the product on the page being clicked.
These streams are connected to Kafka topics. The first topic contains the clicks,
which has an almost constant stream of events coming in.
The second topic is product metadata.
This stream is pretty static, with product updates and new products being added infrequently.
You are planning to use a temporal join to enrich the clickstream,
because you want each enriched click event to include the product data
that was shown on the product page at the time of the click.
That way, as you change the products you can see how the changes impact your click rate.
This sort of temporal join depends on the watermarks from both streams.
This means it will only produce a result when the watermarks for both streams have reached
the timestamp in the event being enriched, which in this case is the click event.
This prevents the join from using stale product data,
which, in the abstract, is exactly what you want from a temporal join.
But, in a case like this where the enrichment stream only rarely changes,
this behavior can be problematic –because the watermark for the product stream is almost
always stuck somewhere in the past, and will only advance when the product info is updated.
Imagine a case where the product info is refreshed once a week.
What’s going to happen, unless you apply the fix I’m about to explain,
is that all of the clicks for the week will pile up somewhere,
and then when the product update does occur, all of the joins that were stuck waiting on the
watermark for the product stream to advance will go ahead and produce their results.
What’s especially interesting is that the product data finally being included in those
enriched clicks is the product data that was already available all week long – all
that Flink lacked was the knowledge that it was safe to go ahead and use that data.
The solution here is to set an idle timeout on the watermark generator.
The effect of this will be that each time we do have a product update,
the watermark for the product stream will advance accordingly.
At that moment, the temporal join will have watermarks from both streams,
and it will behave normally – meaning that it won’t be able to enrich clicks
occurring after the product update, until the idle-timeout is triggered.
Then once the idle timeout elapses, the temporal join will change its behavior.
It will know not to expect updated watermarks from the now-idle product stream, and will
instead produce results using the latest product information available to it, without waiting.
The join will continue to operate this way until you have the next product update.
Let’s take a look at what that looks like.
Idle timeout in action
I'll use Confluent Cloud for this demo,
but these concepts apply to any Flink instance, including open source.
Once logged in I'll click on my SuperCoolStore environment.
As you can see I've already created my cluster but I don't have any data.
I'll click on the Flink tab where we can create our table and run our queries.
I'll create a new compute pool and make sure to select the same region as my cluster.
In this case, us-west-2, and click continue.
I'll leave the default name and click Create.
This will take some time to deploy.
But through the magic of editing we'll speed it up.
Now I'll go ahead and click on Open SQL workspace.
Here's where we'll enter our queries.
First, let's go ahead and create a table.
I'll paste in the table properties and click Run.
After that has been created, we’ll need to populate it with a product,
ideally this would be a topic that would have your events coming in.
However, in this case I'm going to enter just one product, a classic T, size small, that's blue.
I'll go ahead and delete the create table query and paste our product in and then click Run.
Now let's enter our first query with our idle timeout set to zero, effectively disabling it.
Since I'll be adding additional product changes, I'll add a new query window for this join.
As you can see from this query, I'm using the sample data that is provided by Confluent Cloud.
I'll go ahead and click Run.
You can see that our query is running, but no results are coming in because the
watermark is stuck waiting for the product update in order to occur.
So that we can send over the join data.
If I add or change our product we'll see the records come in.
So, rather than a blue classic T, let's add a green one.
I'll go ahead and click Run,
and once we've sent that over we'll see our join data start to come in.
As mentioned earlier, this is the problem we are looking to solve.
We want our joins to happen with the current set of data,
and not waiting for the updates from our product data before seeing the results.
So, let's see what happens when we create a new join,
but this time with our idle timeout set to 1 second.
I'll open up a new query window and paste in our new query.
As you can see, I've set the idle timeout to 1 second.
I'll click Run and wait until we see the status set to running.
Now without even entering in any product data we start to see the records start to come in.
Whereas our other table is still stuck where we left it.
This is our desired behavior.
Summary
Confluent Cloud has recently added a feature called "progressive idleness
detection" that's enabled by default.
This sets the idle timeout to 15 seconds, and over time increases to a maximum of 5 minutes.
This new default behavior is designed to give most users a good experience:
interactive joins will produce results, even if one topic is idle,
and long-running queries will end up with a long timeout.
But for topics you expect to be idle most of the time, you'll want to
consider manually setting a shorter idle timeout for the reasons we've discussed.
As a general guideline, if you expect idleness in one of your data streams,
you can set the idle timeout to a relatively small value.
This allows Flink to resume joining data soon after you’ve updated your idle stream.
On the other hand, for streams where you don’t expect any idleness,
you want to set a large idle timeout.
That way, if there is a technical issue and such a stream does become idle,
the Flink job will stall for a while, giving the problem a chance to resolve itself.
What would your solution look like? Is there another option that you can think
would work? Be sure to let us know in the comments.
Thanks, and until next time, happy streaming.