Software Practice Lead
Senior Curriculum Developer
In traditional SQL databases there's a whole family of different kinds of join operations available -- inner and outer joins, left and right joins, and so on. Algebraically, these cover all of different ways in which you might combine information from two related tables.
While Flink supports all of these classic SQL joins, in most situations it's a mistake to use them in your Flink SQL applications. The reason for this has to do with what it means to execute a join in a streaming fashion.
Typically, two tables are related by having the primary key of one table be present as a foreign key in another table. For example, in the marketplace database used in this course, each record in the customers table has a customer_id column that is its primary key:
CREATE TABLE `customers` (
`customer_id` INT NOT NULL,
`name` STRING NOT NULL,
`address` STRING NOT NULL,
`postcode` STRING NOT NULL,
`city` STRING NOT NULL,
`email` STRING NOT NULL,
PRIMARY KEY (`customer_id`) NOT ENFORCED
);
The effect of setting up the customers table in this way is that each new record in the customers stream replaces whatever was previously stored for that same customer.
Orders, on the other hand, are modeled differently. Orders are immutable, so the stream is insert-only rather than updating (and there's no primary key). And each order has a foreign key relationship to the customer who placed the order:
CREATE TABLE `orders` (
`order_id` STRING NOT NULL,
`customer_id` INT NOT NULL,
`product_id` STRING NOT NULL,
`price` DOUBLE NOT NULL
);
This then makes it possible to join orders to customers, which you might do to enrich each order with some information about the customer. E.g., this seems like a very natural thing to do:
SELECT orders.*, customers.postcode
FROM orders JOIN customers
ON orders.customer_id = customers.customer_id;
However, DO NOT DO THIS.
This video above explains why a traditional inner join is not what you want to use for stream enrichment.
What you want instead is a temporal join:
SELECT orders.*, customers.postcode
FROM orders JOIN customers
FOR SYSTEM TIME AS OF orders.order_time
ON orders.customer_id = customers.customer_id;
What this query does is to relate each order with the most recent customer record as of the time of that order. This is almost always what you really want, and compared to a regular inner join, a temporal join has a number of other advantages:
There are some restrictions, however:
Flink SQL can also efficiently process joins that place a constraint on the time interval in which to look for related records. For example, this query finds orders placed by a customer within 5 minutes of clicking on something:
SELECT
order_id,
orders.`$rowtime` AS order_time,
clicks.`$rowtime` AS click_time
FROM orders JOIN clicks ON `customer_id` = `user_id`
WHERE orders.`$rowtime` BETWEEN clicks.`$rowtime` AND clicks.`$rowtime` + INTERVAL '5' MINUTES;
Note that interval joins can only be used with insert-only tables. This is because the runtime is able to optimize this particular case.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.