Get Started Free
David Anderson

David Anderson

Software Practice Lead

The Flink SQL Runtime

Understanding some basic organizing principles that underlie the Flink SQL runtime will make the details presented elsewhere in this course easier to follow. This will also give you a leg up in troubleshooting some of the error messages you may see.

Streaming or batch, the results are the same

If you find yourself wondering what Flink's streaming SQL runtime is going to do with a query, it helps to keep in mind that at any point in time, the result of a streaming SQL query will be the same as the result from that same query running in a traditional batch-oriented database over the same data.

Flink's stream-oriented runtime will keep around whatever state it needs so that as each additional row of input data is processed, the result can be incrementally updated.

The SQL API is organizing around providing ways to minimize state

When SQL queries are executing by Flink's streaming runtime, they fall into one of two categories: stateless, and stateful.

The stateless queries are those that perform filtering and stateless transformations, such as

SELECT * FROM orders WHERE price < 20;

On the other hand, queries that use GROUP BY and JOIN require state. For example,

SELECT customer_id, COUNT(*)
FROM orders
GROUP BY customer_id;

will need to store a counter for each customer_id, and those counters will need to be maintained indefinitely.

That is, unless we could somehow, hypothetically, know that for some customers, they will never place orders again. Then we could safely remove those counters from our state store.

This is the guiding principle behind a special family of temporal operations provided the Flink SQL API. Examples of these temporal operations include time-based windowing, and temporal joins. These operations are stateful, but they exploit time-based constraints to trigger the deletion of state that the runtime knows won't be needed again.

The video in the module on watermarks illustrates in some detail how time and state are connected.

Stateless queries are easier to scale

The Flink runtime can scale SQL queries in one of two ways.

Stateless queries are embarrassingly parallelizable. The only real limit to scalability is the number of Kafka partitions for the input topic(s).

Stateful queries (and this includes queries using temporal operations) scale by re-partitioning the streams around the key(s) being used for aggregating or joining. This ensures that for any given key, all of the events for that key will be processed by the same compute node. In many cases this can effectively distribute the load across a large cluster,
but if your dataset only has a few distinct keys, or if the data is unevenly distributed, this can severely limit the extent to which your queries can be effectively scaled.

Updating streams/tables are second-class citizens

The various operations provided by Flink SQL, such as windowed aggregations, deduplication, regular joins, temporal joins, etc., fall into one of two categories with respect to the kind of stream they produce as their output:

  • operations that always produce an insert-only stream (such as windowed aggregations and temporal joins)
  • operations that produce an updating stream that can include changes to previously emitted results (such as regular aggregations and regular joins)

Similarly, Flink’s SQL operations also fall into one of two categories with respect to the kinds of streams they can consume as inputs:

  • operations that can only consume insert-only streams
  • operations that can used with any stream, updating or not

This means that some combinations of operations can not be chained together.

If you see an error message along these lines

XXX doesn't support consuming update and delete changes which is produced by node YYY

this is telling you that XXX is an operation that can’t accept an updating stream as its input, and YYY is an operation that produces an updating stream. One way to work around this sort of issue is to change YYY to a time-based version of the same operation: e.g., replace a regular join with a temporal join, or a deduplication with a windowed deduplication.

Lack of watermarks also inhibits composability

All of the temporal operators require watermarks -- i.e., interval and temporal joins, window and OVER aggregations, and MATCH_RECOGNIZE. Meanwhile some of the other operators can't produce watermarks, because their output streams can not be guaranteed to exhibit bounded out-of-orderness with respect to time -- such as regular joins.

For example, if you try to apply a window table-valued function to the output of a regular join, you'll get an error something like this:

The window function requires the timecol is a time attribute type, but is a TIMESTAMP(3).

This is telling you that the timestamp column you have provided as the basis for windowing doesn't have watermarks associated with it.

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.