Get Started Free
course: Apache Flink® Table API: Processing Data Streams in Java

Exercise: Combining Data from Multiple Streams with the Flink Table API

20 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Combining Data from Multiple Streams with the Flink Table API

One of the more powerful features of SQL is its ability to take data from multiple tables and join it in meaningful ways. Just like SQL, the Flink Table API supports joins.

However, when we join data in a streaming system, there are special considerations we have to take into account. Typically, you want to join tables over a limited period. Otherwise, you can be forced to read every event in the tables which is usually inefficient. In addition, longer streams often result in a changelog, with values being inserted, updated, and deleted. Typically, event streams work best when they are append-only.

In this exercise, we'll see how to implement an Interval Join. Interval joins operate on a window of events, making them more efficient. They also explicitly require that the source topics are append-only, eliminating the changelog concerns.

Stage the exercise

Stage the exercise by executing:

./exercise.sh stage 05

Order placed after click.

In addition to the usual orders, customers, products, etc., the business also keeps track of clicks on its website.

Whenever a customer clicks on a product page, that click is registered in a table, along with details about the click.

The business is very interested in understanding how often someone clicks on a product and makes a purchase within some window of time.

This requires data from two places. We need the data from the clicks table, but also the data from the orders table.

This is an excellent opportunity to make use of an Interval Join.

Create the Table

To start, let's create the destination table. You should see that a new ClickService class was created when you staged the exercise. Open it now.

Implement the createOrderPlacedAfterClickTable using the following SQL.

CREATE TABLE IF NOT EXISTS `TABLE NAME` (
    `customer_id` INT NOT NULL,
    `clicked_url` VARCHAR(2147483647) NOT NULL,
    `time_of_click` TIMESTAMP_LTZ(3) NOT NULL,
    `purchased_product` VARCHAR(2147483647) NOT NULL,
    `time_of_order` TIMESTAMP_LTZ(3) NOT NULL
) WITH (
    'kafka.retention.time' = '1 h',
    'scan.startup.mode' = 'earliest-offset'
);

QUESTION: Where would you find the TABLE NAME?

Read the clicks table.

Next, we need to start implementing the join. We'll do this in multiple parts.

In the streamOrderPlacedAfterClick method, start by selecting from the clicks table.

  • Select the following fields from the table:

    • user_id
    • url
    • $rowtime

    NOTE: You will join multiple tables with a $rowtime field. This will create a naming collision in the joined table. Renaming the $rowtime field will resolve the collision.

    Hint

    Look at the CREATE TABLE statement above. When you rename the $rowtime, you can save some effort if you give it the same name as you will need in the destination.

  • The select statement will return a Table object that you can save to a variable.

Read the orders table.

Next, select from the orders table.

  • Select the following fields from the table:
    • customer_id
    • product_id
    • $rowtime
  • Save the result to a variable.
  • Consider renaming the $rowtime field.

Join the tables.

Two tables can be combined using the join method. We'll need to provide a where clause which indicates how to perform the join. For an interval join, that where clause needs to include a temporal window over which to perform the join.

  • Join the results from the clicks table with those from the orders table.

  • In the where clause, use and to combine the following conditions.

    • Ensure the user_id from clicks is equal to the customer_id from orders.
      • Use isEqual.
    • Ensure the $rowtime from the order is greater than or equal to the $rowtime of the click.
      • Use isGreaterOrEqual.
    • Ensure the $rowtime from the order is less than the $rowtime of the click plus the withinTimePeriod parameter.
      • Use isLess, plus, and lit(...).seconds
    Hint

    You can use an and clause as follows

    .where(
        and(
            CONDITION 1,
            CONDITION 2,
            CONDITION 3,
            ETC
        )
    )

NOTE: You should be able to look back at the previous exercises for examples of the types of conditions you will need.

Format the results.

Finally, we need to ensure that the results of our join match the required format for the destination table.

Have a look at the CREATE TABLE statement above. You should be able to determine what fields you need to select to match the output format. Remember, you can always look back at previous exercises for inspiration.

Insert the results

When you are confident in your results, insert them into the destination table you created above.

Don't forget to run the tests.

Update the marketplace

Now we need to run the pipeline.

Update Marketplace.java.

  • Create an instance of the ClickService using the following tables:

    • `examples`.`marketplace`.`clicks`
    • `examples`.`marketplace`.`orders`
      • `flink-table-api-java`.`marketplace`.`order-placed-after-click`
  • Call createOrderPlacedAfterClickTable .

  • Call streamOrderPlacedAfterClick giving it a value of five minutes.

    NOTE: You won't have to wait five minutes. That is the longest it will wait to join results. However, it will begin producing data as soon as it sees matches. This should only take a few seconds once the query starts running.

  • Run the Marketplace.

Verify the results

Let's verify that the stream is working.

  • Open Confluent Cloud, navigate to the order-placed-after-click topic, and view the messages.

WARNING: Don't forget to check your running Flink statements and terminate any you don't want to keep running.

Finish

This brings us to the end of the exercises. At this point, you may want to delete your flink-table-api-java environment to free up resources and ensure you don't incur future costs.

If you are looking for more opportunities to try out the Flink Table API, specific examples of a variety of features can be found in the flink-table-api-java-examples GitHub repository.

Use the promo code FLINKTABLEAPIJAVA to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.