Get Started Free
course: Apache Flink® SQL

How streaming SQL uses watermarks

David Anderson

David Anderson

Software Practice Lead

How streaming SQL uses watermarks

What are watermarks?

Watermarks are special records that are inserted into your data streams to mark the passage of time. Flink SQL relies on watermarks to trigger various time-based operations to produce their results, and to know when data that's being stored by the runtime is no longer useful.

When are watermarks necessary?

Processing time vs. event time

Watermarks aren't needed if you are using processing time (also known as wall clock time) as the source of timing information, rather than timestamps stored with the events. However, using processing time is not recommended, because it introduces non-determinism, and can make it difficult to interpret the results that are produced. Confluent Cloud does not support processing time.

Watermarks enable streaming operations that manage state

Watermarks aren't necessary for batch processing. For streaming workloads, only those streaming operations that pay attention to the passage of time need watermarks. These temporal operations include

  • window aggregations
  • interval and temporal joins
  • sorting (by time), and any operation that involves time-based sorting, including
    • ORDER BY
    • OVER aggregations, including deduplication and top-n
    • pattern matching with MATCH_RECOGNIZE

Queries that only involve projections and filters (i.e., they only use SELECT and WHERE clauses) don't need watermarks. The reason the temporal operations listed above need watermarks is that all of these operations are stateful, and they leverage watermarks to know when they can safely discard some of that state.

Regular aggregations (as opposed to window aggregations) and regular joins (as opposed to interval and temporal joins) don't use watermarks, but they should be used with caution, as they are not just stateful, but use an unbounded amount of state.

What is a time attribute?

The temporal operations mentioned above are all performed in conjunction with a time attribute, which is a timestamp column with watermarking defined on it. For example, if your query is doing time-based windowing, the time attribute's timestamp will be used to determine which window each event should be assigned to, and the watermarks will determine when each window is complete.

Of course, a Flink table can have many timestamp columns. For example, an Orders table might have timestamp columns indicating when an order was placed, and when the order was shipped. However, watermarking can only be applied to one of the timestamp columns, and that special column will be the time attribute. This in turn means that all of these temporal operations can only be applied to the timestamp column you've chosen to use for watermarking.

If this feels like a limitation, then you should model your data differently. For example, separating the shipment timing information from the Orders into its own Shipments table will give you more flexibility.

Documentation

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.