Get Started Free
course: Apache Flink® 101

Watermarks (Confluent Cloud)

10 min
David Anderson

David Anderson

Principal Software Practice Lead

Troubleshooting Watermarks (Confluent Cloud)

This is a hands-on exercise about watermarking in Confluent Cloud.

A related hands-on exercise using open source Apache Flink SQL is also available: Implementing and Troubleshooting Watermarks (Docker). Compared to that exercise, this one is quite a bit simpler, as Confluent Cloud supplies default watermarking that is more than adequate for getting started.

This exercise relies on setting up Confluent Cloud as described in Cloud Setup. This exercise also relies on the Query Profiler, as described in the exercise on Using the Query Profiler.

Causing trouble

Confluent Cloud has defaults for watermarking that prevent common problems arising from mis-configured watermarks. So in order to create an environment in which one can explore troubleshooting watermarks, or the lack thereof, it will be necessary to do some rather silly things that are not recommended.

examples.marketplace.orders is a built-in, read-only table that takes advantage of Confluent Cloud's default watermark strategy. The statement below will start a long-running SQL statement that copies this example table into a new table called orders_kafka:

CREATE TABLE orders_kafka (
  silly_key INT PRIMARY KEY NOT ENFORCED
) DISTRIBUTED BY HASH(silly_key) INTO 3 BUCKETS
AS SELECT customer_id % 2 AS silly_key, * FROM examples.marketplace.orders;

This new table has the following changes, compared to the original table:

  • orders_kafka is backed by a kafka topic with 3 partitions
  • it has a new column, silly_key, that is the PRIMARY KEY
  • silly_key is used as the basis for Kafka partitioning
  • silly_key can only have 2 distinct values

This is designed to guarantee that one of the three partitions in the orders_kafka topic will be empty/idle. However, unlike the situation in open source Flink, the watermarking in Confluent Cloud has an idle timeout configured by default, so this alone will not be enough to cause trouble.

To verify that everything still works, try a query that requires watermarks, such as sorting the table by time:

SELECT $rowtime, * FROM orders_kafka ORDER BY $rowtime;

Now, to truly break things, disable the idle timeout,

SET 'sql.tables.scan.idle-timeout' = '0s';

and try the sort again:

SELECT $rowtime, * FROM orders_kafka ORDER BY $rowtime;

Troubleshooting

The Confluent Cloud Query Profiler, introduced in an earlier exercise, can show you what's going on with the watermarks in this statement.

For investigating the possibility of an idle source or source partition, what you want to do is to go into the Query Profiler tab of the page for that statement. Once the metrics appear, you'll see there's no output, and no watermarks:

no output, no watermarks

Then click on the source task in the job graph:

source task in Query Profiler

In the sidebar that has now opened up, take a look in the "Partition" tab, where you will see that one of the partitions doesn't have a watermark:

per-partition watermarks

This, combined with having disabled the idle timeout, explains why this statement is not producing any output.

Note concerning idleness

The idleness metric displayed in the Query Profiler has nothing to do with the idleness that watermarking cares about. This metric is a measurement of how often the compute resources powering a task are idle, as opposed to busy.

Finishing

You should delete the statement copying the orders into orders_kafka, and drop that table.

You can find and stop the statement in a terminal, or in the cloud UI. You saw the list of statements on the Compute Pool page, on your way to the Query Profiler. This is where you can delete it.

deleting a SQL statement

If you prefer to use a terminal, be sure to select the same cloud provider and region you used when initally setting up Flink. If you used the quickstart as suggested, this will give you the name of the statement that is still running:

confluent flink statement list --status running --cloud gcp --region us-central1

which is what you need to delete it:

confluent flink statement delete {Statement Name} --cloud gcp --region us-central1

You can drop the table in the Flink SQL shell, or in the SQL Workspace (in your browser).

Resources

For an deeper dive into how watermarks behave, and how they can be configured, see How streaming SQL uses watermarks.

Before moving your SQL workloads into production on Confluent Cloud, it's worth considering whether you should be using a custom watermark strategy. You can learn more in this exercise from the Flink SQL course: Hands-on with watermarks.

Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.