Get Started Free
course: Apache Flink® SQL

Exercise: Streaming analytics

30 min
David Anderson

David Anderson

Software Practice Lead

Exercise: Streaming analytics

This exercise reinforces the concepts presented in the modules about window aggregations in Flink SQL and OVER windows. In this hands-on exercise, you will learn:

  • what window table-valued functions (Window TVFs) are, and how they work
  • why it's important to GROUP BY both window_start and window_end
  • how to use OVER windows
  • how to use the LAG and COALESCE functions
  • how deduplication fits in as a special case of an OVER aggregation

Understanding Window TVFs (table-valued functions)

These TVFs all take 3 or 4 input parameters. There's always a DATA parameter providing the name of the input table, and there's always a TIMECOL parameter specifying which column of the input table is the time attribute.

Before using these TVFs for windowing, explore how they behave on their own by executing the queries below. Each query takes a single order as its input, and the results will show the entire table that is returned from each of these table-valued functions.

See if you can correctly predict how many rows will be returned from each of these queries:

WITH one_order AS
  (SELECT *, $rowtime from `examples`.`marketplace`.`orders` limit 1)
SELECT customer_id, product_id, $rowtime, window_start, window_end, window_time
  FROM TABLE(
    TUMBLE(DATA => TABLE one_order, 
           TIMECOL => DESCRIPTOR($rowtime),
           SIZE => INTERVAL '1' MINUTE));
WITH one_order AS
  (SELECT *, $rowtime from `examples`.`marketplace`.`orders` limit 1)
SELECT customer_id, product_id, $rowtime, window_start, window_end, window_time
  FROM TABLE(
    HOP(DATA => TABLE one_order,
        TIMECOL => DESCRIPTOR($rowtime),
        SIZE => INTERVAL '1' MINUTE,
        SLIDE => INTERVAL '15' SECONDS));
WITH one_order AS
  (SELECT *, $rowtime from `examples`.`marketplace`.`orders` limit 1)
SELECT customer_id, product_id, $rowtime, window_start, window_end, window_time
  FROM TABLE(
    CUMULATE(DATA => TABLE one_order,
             TIMECOL => DESCRIPTOR($rowtime),
             SIZE => INTERVAL '1' MINUTE,
             STEP => INTERVAL '15' SECONDS));

Notice how the results from HOP differ from the results from CUMULATE.

Note: The SESSION window TVF is not yet available on Confluent Cloud.

Using these TVFs for windowing

The tables returned by these table-valued functions are designed to have their output rows rolled up by aggregating together all of the rows with the same window_start and window_end values.

Unlike HOP, CUMULATE, and SESSION windows, TUMBLE windows can be uniquely identified by their window_start (or window_end), so it seems unnecessary to bother with grouping by both of these columns in the case of a tumble window. However, it does make a big difference.

It's worth doing the experiment. The two queries below differ only in the GROUP BY clause. Try them both, and use the "Show changelog" option triggered by typing an M in the CLI to see in more detail what's happening with each version:

WITH one_thousand_orders AS
  (SELECT *, $rowtime from `examples`.`marketplace`.`orders` limit 1000)
SELECT window_start, COUNT(*) AS order_count
  FROM TABLE(
    TUMBLE(DATA => TABLE one_thousand_orders, 
           TIMECOL => DESCRIPTOR($rowtime),
           SIZE => INTERVAL '5' SECOND))
  GROUP BY window_start, window_end;
WITH one_thousand_orders AS
  (SELECT *, $rowtime from `examples`.`marketplace`.`orders` limit 1000)
SELECT window_start, COUNT(*) AS order_count
  FROM TABLE(
    TUMBLE(DATA => TABLE one_thousand_orders, 
           TIMECOL => DESCRIPTOR($rowtime),
           SIZE => INTERVAL '5' SECOND))
  GROUP BY window_start;

The Flink SQL planner recognizes the version of this query with GROUP BY window_start, window_end as a windowing query, and executes it with a special window operator. This means that it produces an insert-only table as its result, and it clears from the state backend all information about each window as soon as it emits its result.

On the other hand, the query that specifies GROUP BY window_start is executed as a normal aggregation. The output is an updating stream, and all of the aggregates are retained in the state backend indefinitely.

Using the window_time column

The window_time column is the time attribute for the table emitted by the window operator -- in other words, this timestamp column has watermarking defined on it. This makes it possible to perform temporal operations requiring watermarks on the output of these windows, including another layer of windowing, or an OVER window, temporal join, or MATCH_RECOGNIZE.

Here's an example that uses a tumbling window to count orders per second, and then applies an OVER window to the output of the tumbling window. It uses this OVER window to compute the difference of each tumbling window's counter compared to the counter for the previous window.

WITH orders_per_second AS (
  SELECT window_time, COUNT(*) AS order_count
  FROM TABLE(
    TUMBLE(DATA => TABLE `examples`.`marketplace`.`orders`, 
           TIMECOL => DESCRIPTOR($rowtime),
           SIZE => INTERVAL '1' SECOND))
  GROUP BY window_start, window_end, window_time
) 
SELECT
  window_time,
  order_count, 
  LAG(order_count, 1) OVER w AS previous_count, 
  order_count - LAG(order_count, 1) OVER w AS diff
FROM orders_per_second 
WINDOW w AS (
  ORDER BY window_time
);

The next section will explain how this works.

OVER windows

The example below uses the same logic as the OVER window in the previous section, but it's a bit more straightforward. These two examples illustrate a common pattern, which is to detect changes to a value over time -- in the case below, the query is reporting on changes to product prices.

To accomplish this, we need Flink to sort the input stream by time, partition it by product_id, and find way to get a hold of the previous price. Both OVER windows and MATCH_RECOGNIZE can do these things, but in this case, an OVER window yields a simpler solution.

Note: the examples in this section are using WHERE product_id = '1000' to make it easier to understand the output of these queries.

SELECT
  $rowtime,
  product_id, 
  price, 
  LAG(price, 1) OVER w AS previous_price, 
  price - LAG(price, 1) OVER w AS diff
FROM `examples`.`marketplace`.`orders` 
WHERE product_id = '1000'
WINDOW w AS (
  PARTITION BY product_id 
  ORDER BY $rowtime
);

The LAG function plays the key role here. LAG(expression, n) returns the value of the expression at the nth row before the current row in the window, or in this case, the previous value of the price for the product in the current row.

If you'd rather use 0 as the default price for the case when there is no previous price (instead of NULL), you can use COALESCE to do this:

SELECT
  $rowtime,
  product_id, 
  price, 
  LAG(price, 1) OVER w AS previous_price, 
  price - COALESCE(LAG(price, 1) OVER w, 0) AS diff
FROM `examples`.`marketplace`.`orders` 
WHERE product_id = '1000'
WINDOW w AS (
  PARTITION BY product_id 
  ORDER BY $rowtime
);

Deduplication

The Flink SQL planner recognizes specific patterns of OVER windows as special cases that it then executes with special implementations, rather than the generic OVER aggregate implementation. An example of this is deduplication; other examples include windowed deduplication, top-n, and windowed top-n queries.

The general form of a deduplication query looks like this:

SELECT [column_list]
FROM (
  SELECT [column_list],
    ROW_NUMBER() OVER (PARTITION BY key 
      ORDER BY time_attribute ASC|DESC) AS row_num
  FROM table)
WHERE row_num = 1;
  • ROW_NUMBER() assigns an unique, sequential number to each row, starting with one.
  • PARTITION BY column1[, column2...] specifies the columns used as the deduplication key.
  • ORDER BY time_attribute [ASC|DESC] specifies the ordering column, which must be a time attribute. ASC means keep the first row for each key; DESC means keep the most recent row.
  • WHERE row_num = 1 is required for Flink SQL to recognize that the query is doing deduplication.
SELECT *
FROM (
  SELECT *, $rowtime as order_time,
    ROW_NUMBER() OVER (PARTITION BY customer_id 
      ORDER BY $rowtime ASC) AS row_num
  FROM `examples`.`marketplace`.`orders`
  WHERE customer_id < 3005)
WHERE row_num = 1;

Use the "[M] Show changelog" option to view the results in changelog mode, and then do the same with this variant that uses DESC rather than ASC:

SELECT *
FROM (
  SELECT *, $rowtime as order_time,
    ROW_NUMBER() OVER (PARTITION BY customer_id 
      ORDER BY $rowtime DESC) AS row_num
  FROM `examples`.`marketplace`.`orders`
  WHERE customer_id < 3005)
WHERE row_num = 1;

Documentation

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.