Here are some of the questions that you may have about Apache Flink and its surrounding ecosystem.
If you’ve got a question that isn’t answered here then please do ask the community.
Many stream processing operations are affected by timing information, using timestamps recorded with the events. This includes windows, temporal joins, and pattern matching (using MATCH_RECOGNIZE). These time-based operations wait to product results until they’ve been signaled by a watermark that the stream is sufficiently complete. For example, an hour-long tumbling window won’t have any results until after the end of the hour, and Flink won’t consider an hour complete until seeing an event that pushes the watermark into the following hour.
Whenever a streaming SQL query fails to produce the expected results, this is usually because of a problem with the watermarks.
The most common cause of problems with watermarks is having an idle source, or idle source partition. Setting an idle-timeout will solve this, except in the case where all sources have become idle. E.g.,
SET 'sql.tables.scan.idle-timeout' = '1s';
Confluent Cloud for Apache Flink provides a default watermark strategy for all tables, eliminating the need, in most cases, to worry about watermarking. However, if you have fewer than 250 events per partition, or if events can be delayed by more than 7 days, you should configure a custom watermark strategy.
You can use the CURRENT_WATERMARK function to see the watermarks being generated by Flink SQL. For example, if you have defined a watermark strategy on the user_action_time column in this user_actions table
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
then this query will show you the event time and watermark for each row of the table:
SELECT user_action_time, CURRENT_WATERMARK(user_action_time) FROM user_actions;
If you are using Confluent Cloud and the default watermarking defined on $rowtime, that would look like this:
SELECT `$rowtime`, CURRENT_WATERMARK(`$rowtime`) FROM user_actions;
If the watermarks are null, that will prevent Flink SQL’s temporal operations, such as windows, temporal joins, and MATCH_RECOGNIZE, from producing results. Depending on the circumstances, you may need to set an idle-timeout, produce more events, or be more patient.
The various operations provided by Flink SQL, such as windowed aggregations, deduplication, regular joins, temporal joins, etc., fall into one of two categories with respect to the kind of stream they produce as their output:
Similarly, Flink’s SQL operations also fall into one of two categories with respect to the kinds of streams they can consume as inputs:
So if you see an error message along these lines
XXX doesn't support consuming update and delete changes which is produced by node YYY
this is telling you that XXX is an operation that can’t accept an updating stream as its input, and YYY is an operation that produces an updating stream. One way to work around this sort of issue is to change YYY to a time-based version of the same operation: e.g., replace a regular join with a temporal join, or a deduplication with a windowed deduplication.
In a query like this one:
SELECT window_start, window_end, COUNT(*)
FROM TABLE(
TUMBLE(
DATA => TABLE events,
TIMECOL => DESCRIPTOR(event_time),
SIZE => INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
an error message saying that timecol must be a time attribute is telling you that you need to define watermarking on the event_time column.
Learn how Flink works, how to use it, and how to get started.