Get Started Free
course: Apache Flink® SQL

Streaming JOINs in Flink SQL

20 min
David Anderson

David Anderson

Software Practice Lead

dan-weston

Dan Weston

Senior Curriculum Developer

Streaming JOINs in Flink SQL

Background

In traditional SQL databases there's a whole family of different kinds of join operations available -- inner and outer joins, left and right joins, and so on. Algebraically, these cover all of different ways in which you might combine information from two related tables.

While Flink supports all of these classic SQL joins, in most situations it's a mistake to use them in your Flink SQL applications. The reason for this has to do with what it means to execute a join in a streaming fashion.

Typically, two tables are related by having the primary key of one table be present as a foreign key in another table. For example, in the marketplace database used in this course, each record in the customers table has a customer_id column that is its primary key:

CREATE TABLE `customers` (
  `customer_id` INT NOT NULL,
  `name` STRING NOT NULL,
  `address` STRING NOT NULL,
  `postcode` STRING NOT NULL,
  `city` STRING NOT NULL,
  `email` STRING NOT NULL,
  PRIMARY KEY (`customer_id`) NOT ENFORCED
);

The effect of setting up the customers table in this way is that each new record in the customers stream replaces whatever was previously stored for that same customer.

Orders, on the other hand, are modeled differently. Orders are immutable, so the stream is insert-only rather than updating (and there's no primary key). And each order has a foreign key relationship to the customer who placed the order:

CREATE TABLE `orders` (
  `order_id` STRING NOT NULL,
  `customer_id` INT NOT NULL,
  `product_id` STRING NOT NULL,
  `price` DOUBLE NOT NULL
);

This then makes it possible to join orders to customers, which you might do to enrich each order with some information about the customer. E.g., this seems like a very natural thing to do:

SELECT orders.*, customers.postcode
FROM orders JOIN customers
ON orders.customer_id = customers.customer_id;

However, DO NOT DO THIS.

This video above explains why a traditional inner join is not what you want to use for stream enrichment.

Temporal JOINs

What you want instead is a temporal join:

SELECT orders.*, customers.postcode
FROM orders JOIN customers
FOR SYSTEM TIME AS OF orders.order_time
ON orders.customer_id = customers.customer_id;

What this query does is to relate each order with the most recent customer record as of the time of that order. This is almost always what you really want, and compared to a regular inner join, a temporal join has a number of other advantages:

  • the output stream is an insert-only stream (rather than an updating stream), so you are able to apply any sort of further processing to it, such as pattern matching with MATCH_RECOGNIZE
  • the output stream has watermarks, so again, you aren't restricted in how you can operate on it -- e.g., you can apply windowing to it
  • the runtime doesn't have to store the past orders

There are some restrictions, however:

  • the right-hand (versioned) table must be an updating table with a primary key
  • the primary key of the versioned table must be used in the equivalence condition of the temporal join

Interval JOINs

Flink SQL can also efficiently process joins that place a constraint on the time interval in which to look for related records. For example, this query finds orders placed by a customer within 5 minutes of clicking on something:

SELECT 
  order_id, 
  orders.`$rowtime` AS order_time,
  clicks.`$rowtime` AS click_time
FROM orders JOIN clicks ON `customer_id` = `user_id`
WHERE orders.`$rowtime` BETWEEN clicks.`$rowtime` AND clicks.`$rowtime` + INTERVAL '5' MINUTES;

Note that interval joins can only be used with insert-only tables. This is because the runtime is able to optimize this particular case.

Documentation

Join Queries

Use the promo codes FLINKSQL & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.