Principal Software Practice Lead
This is a follow-on to the earlier docker-based exercise on using Flink SQL for streaming analytics. It uses the Docker-based setup described here.
A related hands-on exercise using Confluent Cloud for Apache Flink is also available: Watermarks on Confluent Cloud.
In this exercise we will revisit the topic of counting page views per second, this time based on the event timing information in the ts field of each row of the pageviews and pageviews_kafka tables. You will learn how to use the TUMBLE window function with an event-time-based timestamp field, which requires creating an event-time time attribute.
An event-time time attribute is nothing more than a timestamp field that has watermarking defined on it, as shown below. The example below defines watermarks on the ts field of the pageviews table, based on the timestamps being at most 5 seconds out of order (which matches the out-of-orderness in the faker expression used to define the ts field in this table).
Note: this version of the pageviews table is somewhat different from you will have used previously. If you are still in the same Flink SQL session, DROP TABLE pageviews; will delete what you had before and let you start afresh.
CREATE TABLE `pageviews` (
`url` STRING,
`user_id` STRING,
`browser` STRING,
`ts` TIMESTAMP(3),
WATERMARK FOR `ts` AS ts - INTERVAL '5' SECOND
)
WITH (
'connector' = 'faker',
'rows-per-second' = '100',
'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
'fields.user_id.expression' = '#{numerify ''user_##''}',
'fields.browser.expression' = '#{Options.option ''chrome'', ''firefox'', ''safari'')}',
'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'
);
Why are the watermarks defined as ts - INTERVAL '5' SECOND? The interval specified here should be an upper bound on how out-of-order the stream might be, which in this case is 5 seconds. Subtracting 5 seconds from the timestamps will produce watermarks that are small enough to avoid having any late events.
Now that watermarks are set up, you can use the same windowing query as in the previous exercise, replacing the proc_time field with ts. But before you execute it, see if you can predict what the results will look like.
SELECT
window_start, count(1) AS cnt
FROM TABLE(
TUMBLE(DATA => TABLE pageviews,
TIMECOL => DESCRIPTOR(ts),
SIZE => INTERVAL '1' SECOND))
GROUP BY window_start, window_end;
Besides windowing, another operation that requires paying attention to time is pattern matching. For example, a pattern might involve finding users who first did one thing, and then another -- which requires that we first sort the stream by time, so that the pattern matching engine can process the event stream in the order in which the events actually occurred.
With streams that use processing time, the events are already (automatically) in order, but with event time, sorting doesn't come for free, and requires watermarks.
Flink SQL includes support for MATCH_RECOGNIZE, which is a part of the SQL standard that makes it much easier to implement pattern matching.
The query below finds cases where users had two pageview events using two different browsers within one second. In this query, PARTITION BY user_id specifies that the matching is to be done independently for each user. ORDER BY ts indicates which column to use as the time attribute for sorting. The PATTERN and DEFINE clauses establish the pattern to be matched.
SELECT *
FROM pageviews
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY ts
MEASURES
A.browser AS browser1,
B.browser AS browser2,
A.ts AS ts1,
B.ts AS ts2
PATTERN (A B) WITHIN INTERVAL '1' SECOND
DEFINE
A AS true,
B AS B.browser <> A.browser
);
In this case, the pattern will match any two events A and B (for the same user_id), where B comes immediately after A (and within one second), and the two events used different browsers (B.browser <> A.browser).
The MEASURES clause is defining the columns that will be included in the result, in addition to the user_id (which is included automatically).
MATCH_RECOGNIZE is much more powerful than this example suggests. See the lessons on pattern matching in the Flink SQL course to learn more.
While the query above is running, take a look in the Flink UI, which is at http://localhost:8081. When you examine the job for this query, you will see that it has watermarks, and they are advancing:
Should you ever need to debug problems with watermarks, you can start here.
While the MATCH_RECOGNIZE query shown above works just fine, if you try the same query against the pageviews_kafka table as set up up below, it will fail to produce any results. See if you can figure out why, and find at least one way to fix it.
First create a Kafka table that mirrors the flink-faker version of the pageviews table:
CREATE TABLE pageviews_kafka
WITH (
'connector' = 'kafka',
'topic' = 'pageviews_kafka',
'properties.bootstrap.servers' = 'broker:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'sink.partitioner' = 'fixed'
)
AS SELECT * FROM pageviews;
This CREATE TABLE AS statement will start a continuous Flink job copying data from flink-faker to Kafka. Leave this statement running while continuing as shown below.
Then add watermarking to the pageviews_kafka table (since CREATE TABLE AS isn't able to do this (yet)):
ALTER TABLE pageviews_kafka
ADD WATERMARK FOR ts AS ts - INTERVAL '5' SECOND;
And finally, run the pattern matching query:
SELECT *
FROM pageviews_kafka
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY ts
MEASURES
A.browser AS browser1,
B.browser AS browser2,
A.ts AS ts1,
B.ts AS ts2
PATTERN (A B) WITHIN INTERVAL '1' SECOND
DEFINE
A AS true,
B AS B.browser <> A.browser
);
This query will produce no results. Can you find the root cause, and fix it?
If you look in the Flink Web UI, you will see that there are no watermarks.
The video on watermarking talks about idle sources, and the problems they cause.
Is there perhaps something different about the Kafka source and the flink-faker source, related to idleness?
The root cause of the problem is that the pageviews_kafka table was defined with this setting in place:
'sink.partitioner' = 'fixed'
With the fixed partitioner, each of the parallel instances of Flink's Kafka sink writes to a single Kafka partition, and this query is running with a parallelism of one. However, pageviews_kafka topic has 3 partitions, so the overall effect is that the topic has some empty/idle partitions, and this is holding back the watermarks.
One approach would be to work around the idle partition problem by configuring an idle timeout, e.g.,
set 'table.exec.source.idle-timeout' = '2000';
This will set the idle timeout to 2000 milliseconds (2 seconds), after which the runtime will mark the idle partitions as inactive, and ignore them (unless they begin to produce data).
Another solution would be to avoid having idle partitions in the first place, e.g., by configuring the pageviews_kafka table to use the default partitioner.
For a deeper dive into how watermarks behave, and how they can be configured, see How streaming SQL uses watermarks.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.