Get Started Free
course: Apache Flink® 101

Streaming Analytics with Flink SQL (Exercise)

David Anderson

David Anderson

Software Practice Lead

Streaming Analytics with Flink SQL (Exercise)


The idea behind streaming analytics is to able to compute, in real time, statistics that summarize a ongoing stream of events. For example, in this exercise you'll be working with queries that count page views per second.

Queries like these need support from Flink for both state and time, since they are accumulating a result over some period of time.

In this exercise we're going to look at two different approaches to counting page views per second. In a later exercise we'll revisit this topic and explore yet another approach.

The pageviews table, as defined in an earlier exercise, has a field that carries timing information:


The faker definition for this ts column includes some randomness: each timestamp is situated anywhere between 1 and 5 seconds in the past. This is meant to simulate the sort of latency and out-of-orderness that you might experience in a real-world application.

'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'

The faker definition also specifies the rate at which events should be produced, e.g.,

'rows-per-second' = '100'

The impact of these settings is that we should expect the one-second-long windows we are about to count to include roughly, but not exactly, 100 events each. (Of course, if you've modified any of these settings, adjust your expecations accordingly!)

Below is an initial, naive attempt to implement windowing. (You can run this query against either the pageviews table or the pageviews_kafka table, since both tables are dervied from the same data source.)

What this query is doing is rounding the timestamp in each page view down to the nearest second, and then counting how many page views occured in each second.

  FLOOR(ts TO SECOND) AS window_start,
  count(url) as cnt
FROM pageviews

There's a problem with doing windowing this way, and that problem has to do with state retention. When windowing is expressed this way, Flink's SQL engine doesn't know anything about the semantics of FLOOR(ts TO SECOND). We know that the result of this function is connected to time, and that the timestamps are (approximately) ordered by time.

When you run this query you'll see that the counters for each window stop incrementing after 5 seconds. This happens because the table definition uses

'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'

to specify that the timestamp for each row should be between 1 and 5 seconds in the past.

However, the Flink runtime isn't able to take advantage of this when executing this query. It will, in fact, keep the counters for all of these windows around forever.

Structurally, the query above is organized the same way as this one:

  count(url) as cnt
FROM pageviews
GROUP BY browser;

Both of these queries are materializing queries, meaning they keep some state indefinitely. The query counting page views per browser is perfectly safe, since there are only a small, finite number of different browsers, but the query counting page views per second is dangerous, because there's a unbounded supply of different seconds.

A better approach to windowing using time attributes and table-valued functions

What Flink SQL needs in order to safely implement windowing (i.e., windows that will be cleaned up once they're no longer changing) is

  • an input table that is append-only (which we have), and
  • a designated timestamp column with timestamps that are known to be advancing (which we don't have (yet))

Flink SQL calls a timestamp column like this a time attribute, and time attributes come in two flavors: processing time (also known as wall clock time) and event time. For our use case, where we're counting page views per second, the distinction is this:

  • When windowing with processing time, a page view is counted based on the second during which it is processed, rather than when it occurred.
  • When windowing with event time, a page view is counted based on when it occured, rather than when it is processed.

When using a time attribute column based on processing time, it's obvious that time will be advancing.

On the other hand, working with event time is trickier: just because a table has a timestamp column doesn't necessarily imply that those timestamps are advancing. For example, our pageviews table could have a created_at column indicating when the page was created. The order in which pages are created has little (if anything) to do with the order in which they are viewed.

To deal with this, Flink relies something called watermarks to measure the progress of event time, and you'll learn how to work with event-time windows and watermarks in a later exercise.

For this exercise, you'll experiment with windows that use processing time. This requires adding a timestamp column that is tied to the system's time-of-day clock. This processing-time-based timestamp column is an example of a computed column, meaning a column whose value is computed, rather than being physically present in the event stream. In this case we want to use the built-in PROCTIME() function. You can add such a column to the pageviews table like this:

ALTER TABLE pageviews ADD `proc_time` AS PROCTIME();

If you now use DESCRIBE pageviews; to inspect this table you will see that Flink SQL has understood that this new column is going to be used as a processing time time attribute:

Flink SQL> describe pageviews;
|      name |                        type |  null | key |          extras | watermark |
|       url |                      STRING |  TRUE |     |                 |           |
|   user_id |                      STRING |  TRUE |     |                 |           |
|   browser |                      STRING |  TRUE |     |                 |           |
|        ts |                TIMESTAMP(3) |  TRUE |     |                 |           |
| proc_time | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS `PROCTIME`() |           |

Flink SQL includes special operations for windowing, which we can now take advantage of. This requires setting up the query in a particular way, using one of the built-in window functions, such as TUMBLE:

  window_start, count(url) AS cnt
  TUMBLE(TABLE pageviews, DESCRIPTOR(proc_time), INTERVAL '1' SECOND))
GROUP BY window_start;

The built-in TUMBLE function used in this query is an example of a table-valued function. This function takes three parameters

  • a table descriptor (TABLE pageviews)
  • a column descriptor for the time attribute (DESCRIPTOR(proc_time))
  • a time interval to use for windowing (one second)

and it returns a new table based on the input table, but with two additional columns added to aid with windowing. To see how this works, try executing this part of the windowing query on its own:


What you're seeing is that the table returned by the TUMBLE function has window_start and window_end columns indicating which one-second-long window each pageview event has been assigned to.

GROUP BY window_start aggregates together all of the pageview events assigned to each window, making it easy to compute any type of aggregation function on these windows, such as counting.

Do it again, with Kafka

The previous section guided you toward counting page views per second using the table backed by the Faker source. This source was configured to produce events at a very specific rate (e.g., 100 page views per second), and the results from the processing-time windows reflected this.

Now try this again, using the pageviews_kafka table from the previous exercise. Try to predict what the results will look like, and see how the actual results compare to your expectations.

[Optional] HOP, CUMULATE, and SESSION windows

In addition to tumbling windows, Flink SQL also supports hopping and cumulating windows with the HOP and CUMULATE window functions. The documentation describes these functions in some detail.

If you are looking to explore windowing more deeply, the HOP and CUMULATE functions are worth experimenting with because they don't just add new columns to the input table, but they can also add additional rows. Seeing how this works will help you appreciate the elegance of this table-valued function approach to windowing.

Flink SQL also supports session windows, but with a different approach. See the documentation on Group Window Aggregation for details.


Use the promo code FLINK101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. ­čÖé By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.