Get Started Free
Untitled design (21)

Tim Berglund

VP Developer Relations

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Joining Data Streams with ksqlDB

In previous modules, we built up the ingestion of data into Apache Kafka. We've got rating events from our customers telling us what they liked or didn't like about our products. In reality, this would come from a producer API directly, but we're simulating it with a data generator in Kafka Connect.

To find out more about who those customers are, we're pulling a mirror of the customer table from a database into Kafka using Kafka Connect. Now let's see how we can use that data from the database within Kafka to enrich the rating events as they arrive.

Streams and Tables

Kafka topics are streams of events, as are ksqlDB streams. They're unbounded, and each event gives us some information, usually tied to an event key such as a customer ID. We can easily turn any stream into a table (and vice versa). Technically, the events in the stream will be used to compute “state,” for example, through an aggregation such as SUM() or COUNT(), and this state is what’s called a table.

Given a stream, any derived streams and tables of that stream are continuously kept up to date whenever a new event (or “record”) arrives in the original stream. Your applications and pipelines will therefore react automatically to data in motion. As each event arrives on the stream, the table is updated to hold the latest value (which can be multiple fields) for the event key. In this example, the key is the customer name, and the value is that customer’s current VIP status.

ksqldb-table-01

Allison is a customer and has just been moved to Bronze VIP status, so we have an event on the customer stream (which comes from the database, by the way). The table also now holds this information.

Another customer is created in the source database. The event comes through on the customers stream into the ksqlDB stream. Rick is a Silver VIP status customer. A second entry is created in the table. At the moment, the table looks just like the stream. Each event has a corresponding entry in the table.

ksqldb-table-02

Now the system updates, and an event is written to the customers stream: Rick has been promoted to Platinum status! Lucky Rick!

Because the event key is the customer name and this is an event for an existing key, the table now diverges from the stream. Whilst the stream tells us what happened, the table tells us the current state. The current value for the customer key Rick is Platinum. That's what the table's telling us.

ksqldb-table-03

Fancy that! Allison got promoted too, to Silver. Because the underlying database changed, we get a new event on the customers stream. That new event on the stream drives a change in the table. For the key of Allison, the value (VIP status) is now Silver:

ksqldb-table-04

A new customer gets created in the database. Because this is a new key (name=Hugh), a new entry is created in the table:

ksqldb-table-05

Joining Streams and Tables in ksqlDB

Now that we've seen how ksqlDB can build up tables based on a stream, we can look at how to use this for what we need: enriching the inbound stream of rating events with information about the customers who left the ratings.

It's worth pointing out here that although we're talking about ksqlDB, all of these concepts also apply to Kafka Streams (on which ksqlDB is built). So even though I'm showing you ksqlDB streams and ksqlDB tables, you can take the same concepts and just think in terms of KStream and KTables, which are the Kafka Streams equivalents.

So, we've got a stream of events (ratings) and a table of state (customer information). We can use stream processing to do a join between these two. The output of the join is data from the two objects where a match on the common key is found. ksqlDB builds up the table internally, and as each event arrives from the source stream, it's compared to the key on the table. The enriched events are written out to a new stream.

ksqldb-joins-01

Here's our data. The source event is ratings information, with a foreign key of user_id. The customer data acts as a reference table (or lookup table). It's got a primary key of user_id and contains useful information about the customers, such as their name, VIP status, and so on.

CREATE STREAM RATINGS_WITH_CUSTOMER_DATA AS
  SELECT R.*, C.*
  FROM   RATINGS R
  INNER JOIN CUSTOMERS C
          ON R.USER_ID = C.USER_ID;

In ksqlDB, we use the standard SQL syntax for expressing a join. Here we're using an INNER JOIN, which means that only source rows that are successfully matched to the table will get written to the output. If we wanted to write all rows to the output regardless of a match to the joined objects, then we'd use a LEFT OUTER JOIN.

The output of the JOIN query gets written to a new ksqlDB stream, which is backed by a Kafka topic to ensure durable, fault-tolerant storage.

ksqldb-joins-02

Use the promo code PIPELINES101 to receive $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.