Get Started Free
course: Apache Flink® 101

Implementing and Troubleshooting Watermarks (Exercise)

David Anderson

David Anderson

Software Practice Lead

Implementing and Troubleshooting Watermarks (Exercise)

This is a continuation of the previous exercise on using Flink SQL for streaming analytics.

Specifying a watermark strategy

In this exercise we will revisit the topic of counting page views per second, this time based on the event timing information in the ts field of each row of the pageviews and pageviews_kafka tables. You will learn how to use the TUMBLE window function with an event-time-based timestamp field, which requires creating an event-time time attribute.

An event-time time attribute is nothing more than a timestamp field that has watermarking defined on it, as shown below. The example below defines watermarks on the ts field of the pageviews table, based on the timestamps being at most 5 seconds out of order (which matches the out-of-orderness in the faker expression used to define the ts field in this table):

CREATE TABLE `pageviews` (
  ... -- other fields the same as before
  `ts` TIMESTAMP(3),
  WATERMARK FOR `ts` AS ts - INTERVAL '5' SECOND
)
WITH (
  ...  -- other properties the same as before
  'fields.ts.expression' =  '#{date.past ''5'',''1'',''SECONDS''}'
);

Why are the watermarks defined as ts - INTERVAL '5' SECOND? The interval specified here should be an upper bound on how out-of-order the stream might be, which in this case is 5 seconds. Subtracting 5 seconds from the timestamps will produce watermarks that are small enough to avoid having any late events.

Now that watermarks are set up, you can use the same windowing query as in the previous exercise, replacing the proc_time field with ts. But before you execute it, see if you can predict what the results will look like.

SELECT
  window_start, count(url) AS cnt
FROM TABLE(
  TUMBLE(TABLE pageviews, DESCRIPTOR(ts), INTERVAL '1' SECOND))
GROUP BY window_start;

Pattern matching with MATCH_RECOGNIZE

Besides windowing, another operation that requires paying attention to time is pattern matching. For example, a pattern might involve finding users who first did one thing, and then another -- which requires that we first sort the stream by time, so that the pattern matching engine can process the event stream in the order in which the events actually occurred.

With streams that use processing time, the events are already (automatically) in order, but with event time, sorting doesn't come for free, and requires watermarks.

Flink SQL includes support for MATCH_RECOGNIZE, which is a part of the SQL standard that makes it much easier to implement pattern matching.

The query below finds cases where users had two pageview events using two different browsers within one second. In this query, PARTITION BY user_id specifies that the matching is to be done independently for each user. ORDER BY ts indicates which column to use as the time attribute for sorting. The PATTERN and DEFINE clauses establish the pattern to be matched.

SELECT *
FROM pageviews
    MATCH_RECOGNIZE (
      PARTITION BY user_id
      ORDER BY ts
      MEASURES
        A.browser AS browser1,
        B.browser AS browser2,
        A.ts AS ts1,
        B.ts AS ts2
      PATTERN (A B) WITHIN INTERVAL '1' SECOND
      DEFINE
        A AS true,
        B AS B.browser <> A.browser
    );

In this case, the pattern will match any two events A and B (for the same user_id), where B comes immediately after A (and within one second), and the two events used different browsers (B.browser <> A.browser).

The MEASURES clause is defining the columns that will be included in the result, in addition to the user_id (which is included automatically).

MATCH_RECOGNIZE is much more powerful than this example suggests. See the documentation for more examples and details.

While the query above is running, take a look in the Flink UI, which is at http://localhost:8081. When you examine the job for this query, you will see that it has watermarks, and they are advancing:

Watermarks in the Flink Dashboard

Should you ever need to debug problems with watermarks, you can start here.

Troubleshooting watermarks

While the MATCH_RECOGNIZE query shown above works just fine, if you try the same query against the pageviews_kafka table, it will fail. See if you can figure out why, and find at least one way to fix it.

Hint The root cause of the problem is that the pageviews_kafka table was defined with this setting in place:
'sink.partitioner' = 'fixed'

With the fixed partitioner, each of the parallel instances of Flink's Kafka sink writes to a single Kafka partition. By default, topics created on Confluent Cloud have six partitions, so the effect of this is that this topic has some empty/idle partitions, since the Flink job writing to this topic had a parallelism that was less than six.

See the video on watermarking for a detailed explanation of why idle sources cause problems for watermarking.

Solutions One approach would be to work around the idle partition problem by configuring an idle timeout, e.g.,
set 'table.exec.source.idle-timeout' = '2000';

This will set the idle timeout to 2000 milliseconds (2 seconds), after which the runtime will mark the idle partitions as inactive, and ignore them (unless they begin to produce data).

A better solution would be to avoid having idle partitions in the first place, by dropping the pageviews_kafka table and redefining it to use the default partitioner.

Resources

Use the promo codes FLINK101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.