Staff Software Practice Lead
One of the more powerful features of SQL is its ability to take data from multiple tables and join it in meaningful ways. Just like SQL, the Flink Table API supports joins.
However, when we join data in a streaming system, there are special considerations we have to take into account. Typically, you want to join tables over a limited period. Otherwise, you can be forced to read every event in the tables which is usually inefficient. In addition, longer streams often result in a changelog, with values being inserted, updated, and deleted. Typically, event streams work best when they are append-only.
In this exercise, we'll see how to implement an Interval Join. Interval joins operate on a window of events, making them more efficient. They also explicitly require that the source topics are append-only, eliminating the changelog concerns.
Stage the exercise by executing:
./exercise.sh stage 05
In addition to the usual orders, customers, products, etc., the business also keeps track of clicks on its website.
Whenever a customer clicks on a product page, that click is registered in a table, along with details about the click.
The business is very interested in understanding how often someone clicks on a product and makes a purchase within some window of time.
This requires data from two places. We need the data from the clicks table, but also the data from the orders table.
This is an excellent opportunity to make use of an Interval Join.
To start, let's create the destination table. You should see that a new ClickService class was created when you staged the exercise. Open it now.
Implement the createOrderPlacedAfterClickTable using the following SQL.
CREATE TABLE IF NOT EXISTS `TABLE NAME` (
`customer_id` INT NOT NULL,
`clicked_url` VARCHAR(2147483647) NOT NULL,
`time_of_click` TIMESTAMP_LTZ(3) NOT NULL,
`purchased_product` VARCHAR(2147483647) NOT NULL,
`time_of_order` TIMESTAMP_LTZ(3) NOT NULL
) WITH (
'kafka.retention.time' = '1 h',
'scan.startup.mode' = 'earliest-offset'
);
Where would you find the TABLE NAME?
Next, we need to start implementing the join. We'll do this in multiple parts.
In the streamOrderPlacedAfterClick method, start by selecting from the clicks table.
Select the following fields from the table:
You will join multiple tables with a $rowtime field. This will create a naming collision in the joined table. Renaming the $rowtime field will resolve the collision.
Look at the CREATE TABLE statement above. When you rename the $rowtime, you can save some effort if you give it the same name as you will need in the destination.
The select statement will return a Table object that you can save to a variable.
Next, select from the orders table.
Two tables can be combined using the join method. We'll need to provide a where clause which indicates how to perform the join. For an interval join, that where clause needs to include a temporal window over which to perform the join.
Join the results from the clicks table with those from the orders table.
In the where clause, use and to combine the following conditions.
You can use an and clause as follows
.where(
and(
CONDITION 1,
CONDITION 2,
CONDITION 3,
ETC
)
)
You should be able to look back at the previous exercises for examples of the types of conditions you will need.
Finally, we need to ensure that the results of our join match the required format for the destination table.
Have a look at the CREATE TABLE statement above. You should be able to determine what fields you need to select to match the output format. Remember, you can always look back at previous exercises for inspiration.
When you are confident in your results, insert them into the destination table you created above.
Don't forget to run the tests.
Now we need to run the pipeline.
Update Marketplace.java.
Create an instance of the ClickService using the following tables:
Call createOrderPlacedAfterClickTable .
Call streamOrderPlacedAfterClick giving it a value of five minutes.
You won't have to wait five minutes. That is the longest it will wait to join results. However, it will begin producing data as soon as it sees matches. This should only take a few seconds once the query starts running.
Run the Marketplace.
Let's verify that the stream is working.
Don't forget to check your running Flink statements and terminate any you don't want to keep running.
This brings us to the end of the exercises. At this point, you may want to delete your flink-table-api-java environment to free up resources and ensure you don't incur future costs.
If you are looking for more opportunities to try out the Flink Table API, specific examples of a variety of features can be found in the flink-table-api-java-examples GitHub repository.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.