Get Started Free
course: Apache Flink® 101

Streaming Analytics (Confluent Cloud)

15 min
David Anderson

David Anderson

Principal Software Practice Lead

Streaming Analytics (Confluent Cloud)

This hands-on exercise is an introduction to using Confluent Cloud for streaming analytics using Apache Flink SQL.

A similar hands-on exercise using open source Flink SQL running in Docker is also available: Streaming Analytics with Apache Flink SQL.

Introduction

The idea behind streaming analytics is to able to compute, in real time, statistics that summarize a ongoing stream of events. For example, in this exercise you'll be working with queries that count clicks per second.

Queries like these need support from Flink for both state and time, since they are accumulating a result over some period of time.

In this exercise we're going to look at a few different approaches to windowing, using Confluent Cloud as described in the Cloud Setup. If you done the setup before, you can use confluent flink shell to start a new Flink shell.

You can use one of the built-in example tables as the basis for these experiments: examples.marketplace.clicks. Here's what that table looks like (don't try to execute this CREATE TABLE statement; this table already exists):

CREATE TABLE `examples`.`marketplace`.`clicks` (
  `click_id` VARCHAR(2147483647) NOT NULL,
  `user_id` INT NOT NULL,
  `url` VARCHAR(2147483647) NOT NULL,
  `user_agent` VARCHAR(2147483647) NOT NULL,
  `view_time` INT NOT NULL
)
WITH (
  'changelog.mode' = 'append',
  'connector' = 'faker',
  ...
  'rows-per-second' = '50'
)

Conceptually, stream processing involves two different notions of time, and thus we can envision two different kinds of windows that count clicks per second:

  • processing time: each click is counted based on the second during which it is processed, using the stream processor's system clock
  • event time: each click is counted based on when it occured, using timestamps carried by the events

Below is an initial, naive attempt to implement windowing. What this query is doing is rounding the current system clock down to the nearest second, and then counting how many clicks are processed each second:

SELECT
  FLOOR(CURRENT_TIMESTAMP TO SECOND) AS window_start,
  count(1) as cnt
FROM examples.marketplace.orders
GROUP BY FLOOR(CURRENT_TIMESTAMP TO SECOND);

There are several problems with doing windowing this way. One immediately apparent issue is that the results are non-deterministic. The table definition (above) calls for the flink-faker event generator to produce 50 rows per second, but there's no guarantee that the windows will be consistently processed at exactly this rate. (Of course, throughput this low isn't very challenging for Flink to handle, so you may or may not see much variation in the results.)

A better, but still naive approach to windowing

Can we do windowing based on event time instead? Yes!

Using DESCRIBE EXTENDED examples.marketplace.clicks; we can see that this table includes another column:

| $rowtime    | TIMESTAMP_LTZ(3) *ROWTIME* | NOT NULL | METADATA VIRTUAL, WATERMARK AS `SOURCE_WATERMARK`() | SYSTEM  |

In Confluent Cloud, $rowtime is a special, read-only system column derived from the source's per-record timestamps.

We can use this column for (naive) event time windowing:

SELECT
  FLOOR(`$rowtime` TO SECOND) AS window_start,
  count(1) as cnt
FROM examples.marketplace.clicks
GROUP BY FLOOR(`$rowtime` TO SECOND);

There's still a problem with doing windowing this way, and that problem has to do with state retention.

With windowing expressed this way, Flink's SQL engine doesn't know anything about the semantics of FLOOR($rowtime TO SECOND). We know that the result of this function is connected to time, and that the timestamps are, at the very least, roughly ordered by time. But, the Flink runtime isn't able to take advantage of this fact. It will, in fact, keep the counters for all of these windows forever in the state that it is managing.

Consider this query, which is structurally identical:

SELECT
  user_agent,
  count(1) as cnt
FROM examples.marketplace.clicks
GROUP BY user_agent;

Both of these queries are materializing queries, meaning they keep their state indefinitely. The query counting clicks for each user_agent is perfectly safe, because there's a small, finite number of distinct user agents, but the query counting clicks per second is more dangerous, because there's a unbounded supply of seconds in the future.

A better approach to windowing using time attributes and table-valued functions

What Flink SQL needs in order to safely implement windowing (i.e., windows that will be cleaned up once they're no longer changing) is

  • an input table that is append-only (which we have), and
  • a designated timestamp column with timestamps that are known to be advancing (which we also happen to have)

Flink SQL calls a timestamp column like this a time attribute, and time attributes come in two flavors: processing time (also known as wall clock time) and event time.

When using a time attribute column based on processing time, it's obvious that time will be advancing.

On the other hand, working with event time is trickier: just because a table has a timestamp column doesn't necessarily imply that those timestamps are advancing. For example, our clicks table could have a page_created_at column indicating when the page being clicked on was created. The order in which pages are created has little (if anything) to do with the order in which they are clicked.

To deal with this, Flink relies a concept called watermarking to measure the progress of event time, and you can learn more about watermarks later in this course, and in more depth in the courses on the DataStream API and Flink SQL. But the basic idea is that for a timestamp column to be a time attribute, it must have watermarking defined on it, and that watermarking will be based on a heuristic describing how out-of-order those timestamps can be.

Fortunately, Confluent Cloud includes default watermarking defined on the $rowtime column. We can rely on this for this exercise.

Furthermore, Flink SQL includes special operations for windowing, which we can take advantage of. This requires setting up the query in a particular way, using one of the built-in window functions, such as TUMBLE:

SELECT
  window_start,
  count(1) AS cnt
FROM
  TUMBLE(DATA => TABLE examples.marketplace.clicks, 
         TIMECOL => DESCRIPTOR($rowtime),
         SIZE => INTERVAL '1' SECOND)
GROUP BY window_start, window_end;

The built-in TUMBLE function used in this query is an example of a table-valued function (TVF). This function takes three parameters

  • a table descriptor (TABLE examples.marketplace.clicks)
  • a column descriptor for the time attribute (DESCRIPTOR($rowtime))
  • a time interval to use for windowing (one second)

and it returns a new table based on the input table, but with two additional columns added to aid with windowing. To see how this works, you should examine the output of TUMBLE on its own (without the GROUP BY aggregation):

SELECT
  *
FROM 
  TUMBLE(DATA => TABLE examples.marketplace.clicks, 
         TIMECOL => DESCRIPTOR($rowtime),
         SIZE => INTERVAL '1' SECOND);

What you're seeing is that the table returned by the TUMBLE function has window_start and window_end columns indicating which one-second-long window each click event has been assigned to.

GROUP BY window_start, window_end aggregates together all of the clicks assigned to each window, making it easy to compute any sort of aggregation function on these windows, such as counting, summing, averaging, etc.

One more thing

Perhaps you noticed this, but another significant difference between the naive windowing based on FLOOR and the more sophisticated approach using window functions is that the naive approach produces a continously updating stream as its output, while the function-based approach produces an append-only stream, where each window produces a single, final result.

The importance of this distinction is discussed at length in a Flink SQL course module on Changelog Processing, but in short, tables based on append-only streams are more versatile in how they can be processed.

[Optional] HOP, CUMULATE, and SESSION windows

In addition to tumbling windows, Flink SQL also supports hopping, cumulating, and session windows with the HOP, CUMULATE, and SESSIONS window functions. The Flink SQL course includes a video covering this topic more deeply.

The HOP and CUMULATE functions are worth experimenting with because they don't just add new columns to the input table, but they can also add additional rows. Seeing how this works will help you appreciate the elegance of this table-valued function approach to windowing.

Resources

Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.