Get Started Free
course: Apache Flink® SQL

Exercise: Hands-on with watermarks

30 min
David Anderson

David Anderson

Software Practice Lead

Exercise: Hands-on with watermarks

The purpose of this exercise is to help you better understand the concepts presented in an earlier course module about watermarks in Flink SQL. In this hands-on exercise, you will learn:

  • what it's like to use the default watermarking strategy built into Confluent Cloud
  • how to define your own watermarking strategy, and when to bother
  • how to debug queries that fail to produce results
  • how to detect late events
  • how to configure the number of partitions and partioning strategy for the Kafka topic backing a Flink table
  • how sorting works in Flink SQL
  • how to read and write the Kafka record timestamp
  • the impact of having empty/idle partitions
  • how to use CASE statements
  • how to use Common Table Expressions (CTEs)

Using the default watermarks provided by Confluent Cloud

Using the Flink shell built into the Confluent CLI, make your way into the sql-course_kafka-cluster database in the sql-course_environment catalog created by the confluent-flink-quickstart plugin. The getting started exercise explains in detail how to do that.

Let's begin by taking a close look at the built-in clicks table:

describe extended `examples`.`marketplace`.`clicks`;

Among other things, the output shows the normally hidden $rowtime column that all tables have with Confluent Cloud for Apache Flink. This special system column is mapped onto the Kafka record timestamp.

Copy
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| Column Name |         Data Type          | Nullable |                       Extras                        | Comment |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+
| click_id    | STRING                     | NOT NULL |                                                     |         |
| ...         | ...                        | ...      |                                                     |         |
| $rowtime    | TIMESTAMP_LTZ(3) *ROWTIME* | NOT NULL | METADATA VIRTUAL, WATERMARK AS `SOURCE_WATERMARK`() | SYSTEM  |
+-------------+----------------------------+----------+-----------------------------------------------------+---------+

The annotation METADATA VIRTUAL indicates that this column is read-only. WATERMARK AS `SOURCE_WATERMARK`() indicates that there is a watermark strategy defined on this $rowtime field, and it is the default watermark strategy provided by Confluent Cloud.

The underlying assumption with watermarking is that the arriving event records will be ingested roughly (or perfectly) in order with respect to the time attribute field, which in this case is $rowtime.

If everything is configured correctly, you will be able to sort this table by $rowtime, so go ahead and try that now:

select user_id, url, $rowtime
from `examples`.`marketplace`.`clicks`
order by $rowtime;

You may find it instructive to try sorting this table on another field, such as the click_id. This will produce an error indicating that "Sort on a non-time-attribute field is not supported."

The stream mapped on this table is arriving more-or-less in order, according to the $rowtime, and the watermarks let the Flink SQL runtime know how much buffering of the incoming stream is needed to iron out any out-of-order-ness before emitting the sorted output stream. Trying to sort by any other field would be intractable.

To see what these watermarks look like, as provided by the watermark strategy built into Confluent Cloud, you can use the current_watermark() function, which you can use like this (by limiting the output to a few hundred rows, you can easily page up and down through the results):

select user_id, url, $rowtime, current_watermark($rowtime) as wm
from `examples`.`marketplace`.`clicks` limit 500;

We're going to copy a few of these clicks into a new table we can experiment with. This is necessary because the built-in tables in the examples catalog can't be modified:

create table `some_clicks` (
  `partition_key` INT,
  `user_id` INT NOT NULL,
  `url` STRING NOT NULL,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) distributed by hash(`partition_key`) into 2 buckets;
insert into `some_clicks`
select
  1 as partition_key, 
  user_id,
  url,
  $rowtime as event_time
from `examples`.`marketplace`.`clicks` limit 500;

There are a few special things going on here, so let's break this down:

  • distributed by hash(`partition_key`) into 2 buckets specifies that the some_clicks table is to be backed by a Kafka topic with 2 partitions, and the partition_key field will be used as the partition key. The query used to populate this table uses 1 as partition_key to guarantee that all of the records will go to the same partition. No real application would want to do this, but this arrangement will result in an empty partition.

  • `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' maps the Kafka record timestamp onto a field named event_time. Yes, we already have access to this timestamp via the special $rowtime field, but that field is read-only. When the query putting data into the some_clicks table uses $rowtime as event_time it is reading the Kafka record timestamps from the examples.marketplace.clicks table, and copying them into the Kafka record timestamps of the some_clicks table.

  • By limiting the some_clicks table to 500 rows the table will stay small enough to be easily examined.

Now take a careful look at the timestamps and watermarks for this new table:

select 
  *, 
  $rowtime, 
  current_watermark($rowtime) as wm 
from `some_clicks`;

You should observe a couple of things:

  • The event_time and $rowtime columns are identical -- this is to be expected, since both are mapped to Kafka record timestamps.
  • Initially, the current watermark for each row (shown as wm) is very far behind the $rowtime (one week behind). This is the expected behavior during the intial warm-up phase of the watermark generator.

Try to predict what you think will happen with this query before executing it:

select 
  user_id, 
  url, 
  $rowtime, 
  current_watermark($rowtime) as wm 
from `some_clicks` 
order by $rowtime;

Defining your own watermarks

The sort on the some_clicks table won't begin to produce results until the watermark for the table reaches the timestamp on the first row. If we had copied more rows from examples.marketplace.clicks into some_clicks this would happen, sooner or later. Instead, we can resolve this by modifying the watermark strategy. Here we are indicating that the event stream will arrive in order:

alter table `some_clicks`
modify watermark for `$rowtime` as `$rowtime`;

Now try that sorting query again (and be patient, it'll take about 15 seconds to begin producing results):

select
  user_id, 
  url, 
  $rowtime, 
  current_watermark($rowtime) as wm
from `some_clicks`
order by $rowtime;

You probably have some questions:

Why did this take so long to produce any results? This Kafka topic backing this table has an empty partition. The lack of events from this partition is holding back the watermark. Eventually Confluent Cloud's default idle timeout kicks in and the watermark does advance.
How did this query ever produce results, since the watermark shown is always NULL? A watermark does eventually arrive, but not until after the `current_watermark` function has been applied to every row.

By default, Confluent Cloud applies something called progressive idleness. The idle timeout starts at 15 seconds, and gradually increases up to a maximum of 5 minutes. The aggressive 15 second timeout at the beginning helps when running ad-hoc queries, as otherwise you would have to wait a long time for a partition to be marked as idle before seeing any results. In production, 5 minutes is safer.

You can experiment with different values for the idle timeout, e.g.,

SET 'sql.tables.scan.idle-timeout' = '1 seconds';
select
  user_id, 
  url, 
  $rowtime, 
  current_watermark($rowtime) as wm
from `some_clicks`
order by $rowtime;

Setting the idle timeout to 0 will completely disable it, in which case any queries that rely on watermarks will hang forever if there's an empty or idle partition.

Detecting late events

Late events have timestamps less than or equal to the SQL operations that are triggered by watermarks, such as sorting, will drop late events, because these events will have arrived too late to be included in the results.

In order to experiment with late events, we can create a new table whose events are (somewhat) out-of-order, and set a watermark strategy that doesn't accommodate all of the out-of-order-ness. The watermarking for this table is set up to allow for the events to be at least 1 second out of order:

create table `ooo_clicks` (
  `user_id` INT NOT NULL,
  `url` STRING NOT NULL,
  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  watermark for `event_time` as `event_time` - INTERVAL '1' SECOND
);
insert into `ooo_clicks`
select
  user_id,
  url,
  TIMESTAMPADD(SECOND, rand_integer(6), $rowtime) as event_time
from `examples`.`marketplace`.`clicks`;

The effect of

TIMESTAMPADD(SECOND, rand_integer(6), $rowtime) as event_time

will be to set the timestamps in this table to 0, 1, 2, 3, 4, or 5 seconds ahead of the original timestamps coming from the built-in clicks table. So the timestamps will be up to 5 seconds out of order, but the watermarking is configured as ``event_time - INTERVAL '1' SECOND, which only allows for at most one second of out-of-order-ness. With this setup we should expect most of the events to be late -- meaning their timestamps will be less than the current watermark at the time they are processed.

Here are some queries you can experiment with to measure how many events are actually late.

This first query counts the number of late events among the first 1000. It uses a CTE (a Common Table Expression) to define a virtual table named a_thousand_clicks. This makes the overall query a bit more readable:

with a_thousand_clicks as 
  (select * from ooo_clicks limit 1000) 
select count(*) from a_thousand_clicks 
where event_time <= current_watermark(event_time);

With each of these queries, try running them multiple times. Observe the degree to which the results are consistent. Note that with this first query, if there are no late events at all, the query will not return zero. It will, instead, produce no result at all. This is more likely than you might expect, for reasons explained at the end of this section.

This second query produces a running count of all of the events, as well as the late events. Note how it uses a SQL CASE statement to count the late events -- this is good trick to know when you want different fields of your result to use different conditions for aggregation:

select
  count(*) as total_events,
  sum(
    case when event_time <= current_watermark(event_time) then 1 
    else 0 
    end) as late_events
from ooo_clicks;

How to interpret these results:

Watermarks, and lateness, are non-deterministic. You can see this directly by repeating any of these queries and comparing the results from different runs.

In particular, late events are less common than we might have predicted. This is because Flink is updating the watermark periodically, rather than on every event. This allows some events that would otherwise have been considered late to slip through unscathed.

Looking at the details, the watermark generator is creating a new watermark every 200msec, and the source is creating new click events at a rate of 50 per second. When one of these queries is operating in near real-time, there will be about 10 events between one watermark and the next, and several of these will probably be late. However, the ooo_clicks table is configured with 'scan.startup.mode' = 'earliest-offset'. When one of these queries is first started, it will process any historic events already in the topic as fast as it can, and will likely be able to process many thousands of events between one watermark and the next. Given our setup, after the first 5 seconds worth of events (i.e., 250 events), none of these can be late.

current_watermark() and updating tables

In Flink SQL, some tables are append-only tables, like examples.marketplace.clicks, while other tables, like examples.marketplace.customers, are updating tables.

The distinction boils down to this: clicks are immutable, while customer data can change over time. On Confluent Cloud, tables with a primary key are setup to behave as updating tables, e.g.,

CREATE TABLE `customers` (
  `customer_id` INT NOT NULL,
  `name` STRING NOT NULL,
  `address` STRING NOT NULL,
  `postcode` STRING NOT NULL,
  `city` STRING NOT NULL,
  `email` STRING NOT NULL,
  PRIMARY KEY (`customer_id`) NOT ENFORCED
);

This means that at any moment in time, there can only be one row for each customer_id.

We will dig into append-only vs. updating tables, primary keys, and changelog streams in greater depth in a later module. For now, you need to be aware that you can't use the current_watermark() function on an updating table in the Confluent Flink shell, or in the Confluent Cloud web-based workspaces. If you do try this, you'll get an error message that talks about using a non-deterministic function in combination with processing update messages, which doesn't work.

If you believe you're having problems with watermarks relating to an update table, one way to get insight into what's happening is to login into Confluent Cloud, and look at the metrics and graphs available for that statement. You'll be able to see how data each query is consuming and producing, and how far behind the query is compared to the latest offset(s) for the input topic(s).

If you do suspect watermarks are causing a problem, and you've overriden the defaults, first try turning off any customizations you've done relating to watermarks, as the defaults are designed to ensure you will get results.

Summing up

The default watermarking strategy used in Confluent Cloud is convenient, and meets the needs of most applications. You will, however, want to define your own watermarking in certain situations:

  • When a topic has very little data
  • When the default watermarking is producing too many late events, and you'd rather put up with increased latency to reduce the likelihood of late events

As for the idle timeout, rather than relying on the progressive idleness built into Confluent Cloud, you may want to set a smaller, fixed idle timeout if you have topics you expect to be idle, and want to decrease latency.

The current_watermark() function is helpful for debugging problems related to watermarks, but can only be used interactively with append-only tables.

Late events happen non-deterministically, and are more likely to occur when processing real-time data than with historic data.

Cleaning up

At this point you should think about deleting any billable resources. In particular, any insert into ... statements you did are probably still running, and should be stopped.

You can always delete the sql-course_environment environment, as explained in the first exercise.

If instead you want to keep the topics/tables you created but delete any running statements, here's how to do that:

confluent flink statement list --status running

confluent flink statement delete <STATEMENT_NAME>

Documentation

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.