Get Started Free

Apache Flink in Action: Working with a Paused Stream

Using watermarks can create havoc when you have a stream that has significant delays between records. In this video we discuss a possible solution using a batch close record, which will ensure that your data is up to date.

Apache Flink in Action: How to work with a paused stream

dan-weston

Dan Weston

Senior Curriculum Developer

Using Apache Flink® watermarks can create havoc when you have a stream that has significant delays between records. In this video we’ll talk about a possible solution to this situation using a batch close record, which will ensure that your data is up to date.

Apache Flink in Action: How to work with a paused stream

Hi, I’m Dan Weston, in this Apache Flink in Action we’ll be looking at a solution
for when you have data that has a significant pause between events,
whether we're talking hours, days, or possibly even weeks. If you're using watermarks, you won’t see all of your data. There will be at
least one record waiting for the next watermark before it will be sent over. So, how can we solve this? Let’s break down a possible solution.
Let’s say you own a store that processes sales. You’ve set up a Flink job that runs continuously
so you can see your sales data at any point throughout the day. However, you’ve also noticed that you don’t see the last hour or so
of data at the end of the day since the sales have stopped. With no events coming in, Flink can’t update the
watermark and send over the previous hours worth of data. You might think that setting an idleness window would work, but in this case, since you don’t have
any data coming in, the stream isn’t idle, it’s stopped, at least until the next sale comes in.
In this instance, I recommend creating a task that automatically adds a “batch close” record,
Sending a batch close record or purchase, for $0 that occurs just after 5 PM every day. This will provide a new watermark and effectively close the window for the last hour of that day. Giving you confidence that you’ve captured all your sales data. Now,
since we added a new record that wasn’t an actual sale,
you may need to filter this record out to give you the correct number of sales for that day. Let’s take a look at what this would look like. I'll be using Confluent Cloud for this demo. Close record in action However, these principles apply to any version of Flink,
so feel free to follow along with whichever version suits you. Here I have a new confluent cloud environment with no data or even
any topics. I'll click on the Flink tab and then click on the Create Compute Pool button. I'll make sure to select the same location as my cluster, in this case, that's US West 2,
and click continue. I'll just accept the default name and click create. This will take a little bit of time, but through the power of editing we'll skip to the future! Once that's finished being created, I'll go ahead and open the SQL workspace. Here's where we can enter our Flink SQL statements. We'll go ahead and create a new table that holds our sales records. I've set this table up so that it emits a watermark at the maximum observed
timestamp so far, and only has one partition. We'll go ahead and click run and our table will be created. Next, I'll add a sale that occurred for $19.99 and
occurred at midnight. I'll click run and that sale will be inserted in. Now that we know that we have a sale, let's go ahead run a query to see if it shows up. As we mentioned in the intro, we won't actually see any records.
The reason for that is because we need a new sale to come in
that creates a new watermark. However, we'll leave this query running for now. I can then open up a new command space and do a select all so that we can see our record. And as you can see, there's our record. Let's leave that one running as well. As we talked about earlier, let's go ahead and add our sale for $0. I'll open up another command space and enter in a sale for $0, just like we discussed earlier. This will create our new watermark and
we'll see the $19.99 sale that we entered above appear in our query. And there it is. As we discussed earlier, this $0 record will appear every time we enter it in. Possibly throwing off our sales numbers. Luckily, we can filter that out so that we only get the records that have a value greater than $0. And there you go. There's our $19 record. Now, just for fun let's enter two more records, one for $5, and another for $0. Just to see that our queries are behaving as they should. THere's the $5 record, and there's the $0 record. And there it is. We see the 19 and the five for all the values that are greater than zero. We see the four records that we entered in the 19, the $0, the $5, and then another $0. And then we see the three records up above. So, that's it. Outro What are your thoughts? Can you think of another way you could solve this problem? If so, make sure you drop it in the comments below. Until next time, happy streaming.

Related Videos

You may also be interested in:

Apache Flink in Action: How to Set Idle Timeouts