Get Started Free
‹ Back to courses
course: Apache Flink® Table API: Processing Data Streams In Java

Automatic Watermarking in Confluent Cloud

8 min
Wade Waldron

Wade Waldron

Principal Software Practice Lead

Automatic Watermarking in Confluent Cloud

Watermarking with Apache Flink® can be tricky. Even the most experienced developers sometimes get caught with a broken data pipeline because of incorrectly configured watermarks. It is one of the most common sources of Flink errors for new users.

It can be very frustrating when you are trying to learn Flink, and your data appears to be flowing in, but no results are being produced. Thankfully, Confluent Cloud has an automated watermarking strategy enabled by default. This automated strategy can intelligently adapt to your data and ensure you get results.

However, the default watermarking strategy may not be for everyone. It's designed to work for a variety of scenarios, but if your use case doesn't fit, you need a way to override it. Thankfully, it's still possible to set a custom watermarking strategy, should you need to.

In this video, you will learn why watermarking is so important, how the default strategy behaves in Confluent Cloud, and how to override it if necessary.

Topics:

  • What are watermarks, and why are they necessary?
  • What strategy does Confluent Cloud use to apply default watermarks?
  • When is it a good idea to override the default watermarks?
  • How can the default watermark strategy be overridden?

Resources

Code

How to define a custom watermark on the sensor_time field in the schema for a table.

Schema schema = Schema.newBuilder()
	.column("sensor_time", DataTypes.TIMESTAMP(3).notNull())
	.watermark(
		"sensor_time", 
		$("sensor_time").minus(lit(1).minute())
	)
	.build();
Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Automatic Watermarking in Confluent Cloud

Flink watermarks are hard.

Seriously, even the experts who understand Flink deeply sometimes trip over them.

Confluent has addressed this by introducing a default watermarking strategy that will be effective for many users.

However, I always worry about what happens if the default doesn't work for my use case.

I'm going to give a simplified explanation of how watermarking works, both in general, and in Confluent Cloud.

Then, I'll show you how to customize it in the Java Table API, if you don't like the default options.

So stick around.

Imagine a busy highway.

On the highway, we place a sensor that detects vehicles moving past.

Each time a vehicle goes by, we emit an event to a Kafka topic.

The event contains the timestamp for when the vehicle passed the sensor and any other relevant details.

Downstream, a Flink job processes these events to determine how many vehicles passed the sensor every minute.

To do this, we could look at the timestamp in each event, and truncate it down to the minute.

Events that share the same minute would be added to the current count.

And, when we encounter a new minute, we can start a new count.

This seems relatively simple and would work if we had a single-threaded process where time was strictly increasing.

Essentially, for this to work, we'd have to funnel all of our vehicles through a single sensor to create a strictly linear flow.

On a busy highway, that will create some nasty traffic jams.

We'd prefer the system to be a little more scalable.

For example, we could introduce parallel lanes and additional sensors.

Each sensor could be assigned to a partition in the Kafka topic.

This allows for parallelism and improves scalability.

However, it comes at a cost.

Let's replace our cars with the timestamps for when they cross the individual sensors.

The events will be funneled through the Kafka partitions and into the Flink job.

Kafka guarantees order within a partition,

but it doesn't guarantee order across multiple partitions.

This creates problems when our events arrive out of order.

Imagine if this was the order we received the events.

When the first event arrives, we truncate the time and start counting events that occurred during the thirty-sixth minute.

When the second event arrives, again, we truncate the time, but now we have to start a new counter because this one occurred during the thirty-seventh minute.

And here's where the problem occurs.

What do we do with our old counter?

Do we assume it's complete, and emit the count, or do we keep it around in case there is more data?

You and I can look ahead and see there's another event coming.

But the Flink job can't do that.

That other event is at some point in the future, and the job can't predict what will happen.

If the job were to emit the count without waiting, then we lose data from that later record.

We could wait for the record,

but if it doesn't exist, how long do we wait before giving up?

One way or another, we have to make a decision.

This is where watermarks come into play.

The purpose of a watermark is to provide a maximum amount of time we are willing to wait for out-of-order events.

Looking at our example, if we assumed every event can be out of order by at most one minute, it allows us to derive some watermarks.

We do that by taking the time for the current event, and subtracting our out-of-orderness threshold, in this case, one minute.

So here, any event that occurs before 8:35:21 would be discarded because it is considered too old.

That doesn't impact any of the events at the moment.

But let's move forward a little.

At this point, our watermark has advanced to 8:36:39, again, one minute before the current event.

Now, if our next event came out of order at 8:36:52, we would keep it because it falls after the current watermark.

However, the current watermark is always calculated from the maximum event time.

So rather than recalculating the watermark for this out-of-order event, we keep the previous one.

On the other hand, if we had an event like this one at 8:36:30, it would fall before the current watermark.

That means it would be considered late and would be dropped.

The challenge with deciding on a watermark strategy is that the decision has significant impact on latency, completeness, and even whether or not the data flows.

If the amount of time to wait for out-of-order events is too short, the system could drop critical data.

Meanwhile, if it waits too long, it introduces unnecessary latency.

And, if a partition goes idle, it can prevent the watermarks from advancing which can halt the data completely.

These complexities are why Confluent Cloud has a default strategy in place, designed to intelligently adapt to the flow of data in the system.

Each event in Kafka

is automatically assigned a timestamp representing when the event was sent.

As messages flow to a Flink job,

Confluent Cloud computes a histogram of observed delays in the timestamps.

This is used to calculate an ideal watermark based on various thresholds ensuring that for most use cases, the default watermarks just work.

It's designed to automatically adjust the allowed out-of-orderness in the watermarks to target 95% of messages successfully passing through the system.

That leaves the remaining 5%,

which may be dropped.

Just keep in mind that this is an average.

If your traffic comes in bursts of out-of-order events, those bursts may cause the average to drop temporarily below the 95% threshold.

On the other hand, if your traffic has consistent out-of-orderness, then the algorithm may be able to push closer to 100%.

Whatever the case, it adjusts over time so that if your traffic patterns change, it will adapt.

However, that's not to suggest that this is perfect.

There are cases where the default watermarking may be insufficient.

The histogram requires a minimum of 250 events per partition before it will start generating reasonable watermarks.

If the data size is small enough, it may not be able to produce a good watermark.

Furthermore, there are boundaries on the acceptable amount of out-of-orderness.

It will never go lower than 50 milliseconds or higher than one week.

If your system needs to operate outside those thresholds, the automatic watermarking may be insufficient.

And, it does drop extreme outliers.

If those are important, then we'll need to find a way to keep them.

Finally, it assumes that the Kafka timestamp is sufficient.

If the data contains a different timestamp that should be used instead, then the default strategy won't work.

If any of these scenarios apply, it's possible to override the default watermark strategy.

Let's look at a simple Table API schema for our vehicle sensors.

Here we see a schema with a single column named sensor_time representing the time a vehicle crosses our sensor.

This timestamp will be different from the Kafka timestamp, but this is the one we want to use.

As such, we won't use the default watermark strategy.

We can override the watermark strategy by adding an additional setting to our schema.

It accepts the name of the field,

and an expression to calculate the watermark.

Here, we are taking the sensor_time and subtracting one minute.

This will override the default watermark strategy and apply a custom one.

I've somewhat simplified the explanation of watermarks for this video.

It's a complicated topic that's difficult to explain in a short time.

That's one reason I am thankful that Confluent Cloud automates it for many use cases.

There's nothing worse than setting up your Flink job only to discover that no data is flowing because you've messed up the watermarks somehow.

Having said that, I don't want you to leave this video feeling like every job requires you to think about watermarks.

Some operations don't need them, allowing you to forget they exist.

Watermarks matter if you are building jobs that operate on the time an event occurs.

If that's not the case, then you may never need to think about them.

If you want a deeper dive on watermarks, check out some of the links in the video description for other videos and docs that go into more detail.

Meanwhile, keep an eye out for future videos in this series and drop me a comment if anything is still not making sense.

Thanks for watching.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.