Get Started Free
‹ Back to courses
course: Apache Flink® Table API: Processing Data Streams In Java

Joining Flink Tables using the Apache Flink® Table API

6 min
Wade Waldron

Wade Waldron

Principal Software Practice Lead

Joining Flink Tables using the Apache Flink® Table API

The Apache Flink Table API provides the ability to work with streaming data similar to how we work with relational database tables. This includes the ability to join tables together to provide more detailed views.

However, joining data streams has complexities which can cause the Flink internal state to grow significantly, if you aren't careful.

Much like with many other Table API operations, joins in Flink work best when there is a natural time window in which the join should be applied. This is where tools like an interval join can help to optimize how the internal state is being maintained.

In this video, we'll look at some of the joins supported by the Flink Table API, including the basic inner join, outer join, and full outer join, and more complex joins such as the interval join.

Topics

  • Why do we need to join data in Flink?
  • How does an inner join work in the Flink Table API?
  • How can we avoid ambiguous field names when joining Tables?
  • How can we limit the fields returned by the join?
  • Why can simple joins be resource intensive?
  • How can we adjust the time-to-live for the job state?
  • What is an interval join, and why is it more state efficient?

Resources

Code

Selecting from the customers table:

Table customersTable = env.from("customers").select(
		$("customer_id").as("c_customer_id"),
		$("name"),
		$("address"),
		$("phone_number")
	);

Selecting from the orders table:

Table ordersTable = env.from("orders").select(
		$("order_id").as("o_order_id"),
		$("customer_id").as("o_customer_id"),
		$("product_id"),
		$("price"),
		$("$rowtime").as("order_time")
	);

Selecting from the shipments table:

Table shipmentsTable = env.from("shipments").select(
		$("shipment_id"),
		$("order_id").as("s_order_id"),
		$("shipping_address"),
		$("$rowtime").as("shipment_time")
	);

Joining the orders and customers table:

Table productOrdersWithNames = customersTable.join(ordersTable)
	.where(
		$("c_customer_id").isEqual($("o_customer_id"))
	)
	.select(
		$("name"),
		$("product_id"),
		$("price")
	);

A left outer join:

customersTable.leftOuterJoin(ordersTable)
  ...

A full outer join:

customersTable.fullOuterJoin(ordersTable)
  ...

An interval join on the orders and shipments tables:

Table onTimeOrders = ordersTable.join(shipmentsTable)
	.where(
		and(
			$("o_order_id").isEqual($("s_order_id")),
			$("order_time").isGreaterOrEqual(
				$("shipment_time").minus(lit(24).hours())
			),
			$("order_time").isLess($("shipment_time"))
		)
	);

Configuring the state time-to-live:

EnvironmentSettings settings = ConfluentSettings
	.newBuilderFromResource("/cloud.properties")
	.setOption("sql.state-ttl", "24 hours")
Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Use the promo codes FLINKTABLEAPIJAVA & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Joining Flink Tables using the Apache Flink® Table API

In a traditional system, data is placed in relational database tables which are often highly normalized.

This results in tens, hundreds, or even thousands of tables.

The data from the tables is joined together to create detailed views.

Joining can be complicated and expensive, even in a relational database.

But how does this change when working with data streams?

The Apache Flink® Table API allows us to wrap data streams in table-like structures and query them like a database, including using joins.

Let's look at how joins work with the Table API, and what hidden costs might be associated with them.

Imagine an e-commerce system with a table containing customers, including a customer_id, name, address, and phone number.

We could query the table and extract the fields with a simple select statement.

Now, imagine another table that contains orders, including the order_id, customer_id, product_id, and price.

And, once again, we could query the table with a simple select statement.

What if we wanted to create a list of customer names, which products they ordered, and what the price was?

We'll call this new table product_orders_with_names.

This is where we could use a join.

However, we need to specify how rows in the customers table will map to rows in the orders table.

Looking at both tables side by side,

We can see that some of the rows have a matching customer_id.

If we join the tables using the customer_id, we'll eliminate any rows that don't exist in both tables.

This is what is called an inner join.

It takes records from the first table and matches them to records from the second table, returning only the rows with a direct match.

An inner join can be implemented with the join and where clauses.

Except there is a problem.

The customers and orders tables both contain a customer_id field.

Joining the tables creates a naming collision because the result contains two fields with the same name.

It also makes the where clause ambiguous.

To fix this, let's go back to our original table definitions.

We will rename the customer_id field in at least one of the tables to avoid the conflict.

For consistency, I'll do it in both.

Now, we can update the where clause to use the renamed fields.

This will prevent the naming collision.

Executing the join will combine the data from the tables, eliminating any unmatched rows.

Unfortunately, the result includes several fields we don't care about.

So let's clean up the results with a select statement.

By selecting the name, product_id, and price, we can reduce our results to match our desired output.

In addition to inner joins, Flink supports leftOuterJoin and fullOuterJoin.

Check out the documentation for details on how these work.

At the beginning of the video, I mentioned that performing joins came with complexities.

Let's go back to our original tables to understand this better.

I've highlighted the matching customer_ids.

But what happens to the records that didn't match?

I'd love to tell you that we can just ignore them because they aren't needed, but unfortunately, that's not the case.

Unmatched records still have to be stored somewhere.

They won't appear in the final results, but the job has to keep them in its internal state.

This is because although there may not be a match now, that doesn't mean there won't be one eventually.

And when one arrives, we need access to the old records to include them in the result.

Keeping these records means the state of a Flink job can grow quite large if we aren't careful.

Furthermore, sometimes we join streams that include upserts or retractions.

When we perform an update to a record with many corresponding entries in the results table,

It can cause a cascading update to all of those old records.

This can be a complicated and expensive operation.

Because of these complexities, we must be careful when using these basic joins.

They are best used where the table sizes are limited and we don't have to worry about the state growing out of control.

One way to control the state is to set the state time-to-live for the Flink environment.

This configuration will cause Flink to delete the dormant state after a specified period.

This can reduce resource usage for the job.

However, if an event shows up outside of the time window, the state may have been deleted.

This could cause inconsistencies if your application requires that data.

The best-case scenario for joins is when there is already a natural time limit in the business logic.

Let's look at another example.

Returning to our original orders table,

we'll extract the $rowtime field and name it order_time.

Now, imagine we had a second table named shipments containing details about when an order was shipped and its destination, with a service level agreement stating that all orders should be shipped within 24 hours.

We could verify this agreement using what is called an interval join.

An interval join only considers records that appear within the specified time interval.

To create an interval join, we start with a join statement, just like before.

The difference comes when we implement the where clause.

Notice the inclusion of an and expression.

We still need a field to match on, in this case, the order_id.

However, we also need to add a time interval.

Here, we look at orders where the order_time is less than the shipment_time but greater than or equal to the shipment_time minus 24 hours.

Anything outside of that window will be ignored.

The advantage of the interval join is that it has a natural time window built-in.

This allows Flink to discard any state that falls outside the window.

However, interval joins require append-only streams to avoid the cascading update problem I mentioned earlier.

They don't currently support retract or upsert streams.

Despite that, they can be a powerful tool in our Flink arsenal when the circumstances are right.

So, as you can see, the Flink Table API can perform joins on data streams just like a relational database.

These joins work best when there is a time limit attached to the queries to limit the growth of the job state.

I hope you enjoyed learning about Joins in the Flink Table API.

Thanks for watching.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.