Get Started Free
David Anderson

David Anderson

Software Practice Lead

OVER aggregations

For windows relative to the time-of-day clock (rather than each event), see the module on window aggregations.

For a guided exploration into how both window and over aggregations work, see the accompanying hands-on exercise.

The big idea, as illustrated with an example

The idea behind OVER windows in Flink SQL is that you can have a window that ends with the current row, and that stretches backwards through the history of the stream for a specific interval, either measured in time, or by some number of rows.

For example, this query computes a running average of the price for each product, averaged over the prices paid for the last 3 orders for that product. The count of orders in each window is also included so you can see what happens during the warm-up phase, when there haven't yet been 3 orders.

SELECT 
  product_id,
  price,
  COUNT(price) OVER w AS num_prices,
  AVG(price) OVER w AS avg_price
FROM `examples`.`marketplace`.`orders`
WINDOW w AS (
  PARTITION BY product_id
  ORDER BY $rowtime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
);

If you want to experiment with this query, I suggest narrowing the scope to only a few products, so that it's easier to observe the results:

SELECT 
  product_id,
  price,
  COUNT(price) OVER w AS num_prices,
  AVG(price) OVER w AS avg_price
FROM `examples`.`marketplace`.`orders`
WHERE product_id < 1010
WINDOW w AS (
  PARTITION BY product_id
  ORDER BY $rowtime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
);

The results are updated for every input row. This means that the product_id and price in output are referring to the values of those columns from the latest order, and the num_prices and avg_price columns reflect the values aggregated over the window w as it is defined (relative to the latest order).

In this case the windows are partitioned by product_id, since we want to compute the running average separately for each product. You could leave out the PARTITION BY clause if you wanted to compute global aggregates, instead of breaking down the results on a per-product basis.

Sorting the input with ORDER BY $rowtime is necessary so that window's interval is well-defined. Otherwise, if the input stream were somewhat out-of-order, it wouldn't be possible to produce consistent, meaningful results. OVER windows only support sorting on time attributes, and only when ascending.

Important special cases

Flink SQL also supports deduplication, top-n, and windowed top-n queries as special cases of OVER windows. For more on this, see the exercises and the documentation.

Documentation

Use the promo codes FLINKSQL & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.