Get Started Free
Untitled design (21)

Tim Berglund

VP Developer Relations

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Joining Data Streams with ksqlDB

In previous modules, we built up the ingestion of data into Apache Kafka. We've got rating events from our customers telling us what they liked or didn't like about our products. In reality, this would come from a producer API directly, but we're simulating it with a data generator in Kafka Connect.

To find out more about who those customers are, we're pulling a mirror of the customer table from a database into Kafka using Kafka Connect. Now let's see how we can use that data from the database within Kafka to enrich the rating events as they arrive.

Streams and Tables

Kafka topics are streams of events, as are ksqlDB streams. They're unbounded, and each event gives us some information, usually tied to an event key such as a customer ID. We can easily turn any stream into a table (and vice versa). Technically, the events in the stream will be used to compute “state,” for example, through an aggregation such as SUM() or COUNT(), and this state is what’s called a table.

Given a stream, any derived streams and tables of that stream are continuously kept up to date whenever a new event (or “record”) arrives in the original stream. Your applications and pipelines will therefore react automatically to data in motion. As each event arrives on the stream, the table is updated to hold the latest value (which can be multiple fields) for the event key. In this example, the key is the customer name, and the value is that customer’s current VIP status.

ksqldb-table-01

Allison is a customer and has just been moved to Bronze VIP status, so we have an event on the customer stream (which comes from the database, by the way). The table also now holds this information.

Another customer is created in the source database. The event comes through on the customers stream into the ksqlDB stream. Rick is a Silver VIP status customer. A second entry is created in the table. At the moment, the table looks just like the stream. Each event has a corresponding entry in the table.

ksqldb-table-02

Now the system updates, and an event is written to the customers stream: Rick has been promoted to Platinum status! Lucky Rick!

Because the event key is the customer name and this is an event for an existing key, the table now diverges from the stream. Whilst the stream tells us what happened, the table tells us the current state. The current value for the customer key Rick is Platinum. That's what the table's telling us.

ksqldb-table-03

Fancy that! Allison got promoted too, to Silver. Because the underlying database changed, we get a new event on the customers stream. That new event on the stream drives a change in the table. For the key of Allison, the value (VIP status) is now Silver:

ksqldb-table-04

A new customer gets created in the database. Because this is a new key (name=Hugh), a new entry is created in the table:

ksqldb-table-05

Joining Streams and Tables in ksqlDB

Now that we've seen how ksqlDB can build up tables based on a stream, we can look at how to use this for what we need: enriching the inbound stream of rating events with information about the customers who left the ratings.

It's worth pointing out here that although we're talking about ksqlDB, all of these concepts also apply to Kafka Streams (on which ksqlDB is built). So even though I'm showing you ksqlDB streams and ksqlDB tables, you can take the same concepts and just think in terms of KStream and KTables, which are the Kafka Streams equivalents.

So, we've got a stream of events (ratings) and a table of state (customer information). We can use stream processing to do a join between these two. The output of the join is data from the two objects where a match on the common key is found. ksqlDB builds up the table internally, and as each event arrives from the source stream, it's compared to the key on the table. The enriched events are written out to a new stream.

ksqldb-joins-01

Here's our data. The source event is ratings information, with a foreign key of user_id. The customer data acts as a reference table (or lookup table). It's got a primary key of user_id and contains useful information about the customers, such as their name, VIP status, and so on.

CREATE STREAM RATINGS_WITH_CUSTOMER_DATA AS
  SELECT R.*, C.*
  FROM   RATINGS R
  INNER JOIN CUSTOMERS C
          ON R.USER_ID = C.USER_ID;

In ksqlDB, we use the standard SQL syntax for expressing a join. Here we're using an INNER JOIN, which means that only source rows that are successfully matched to the table will get written to the output. If we wanted to write all rows to the output regardless of a match to the joined objects, then we'd use a LEFT OUTER JOIN.

The output of the JOIN query gets written to a new ksqlDB stream, which is backed by a Kafka topic to ensure durable, fault-tolerant storage.

ksqldb-joins-02

Use the promo code PIPELINES101 to receive $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Joining Data Streams with ksqlDB

Hi, I'm Tim Berglund with Confluent. Welcome to Data Pipelines' lesson five, Joining Streams of Data with ksqlDB. So previously in this course, we've done some ingesting of data into Kafka. We've got ratings events from our customers telling us what they liked or didn't about our products. We appreciate their opinion, as always. In reality, this would come from the Producer API, probably of some service that had a rest endpoint and some mobile app or web front end or something would be throwing little pieces of JSON at it. But we're simulating it using the data generator connector, that's a fully managed connector in Kafka Connect in Confluent Cloud. So to find out more about who those customers are, they have opinions, we wanna know who they are. We're gonna pull in a mirror of the customer table, from a database into Kafka using Kafka Connect. Alright, We have to get that customer data, into Kafka to be able to do this join in real time. So let's see how we can make this happen. Kafka topics are streams of events, as our ksqlDB streams, I mean, streams are kind of a lightweight abstraction on top of topics, and it's appropriate by the way, sometimes we say stream when we're talking in ksqlDB terms, cause that's the appropriate abstraction, that's what it is in ksqlDB. But there's always a topic underneath it, so you should keep that in mind. Streams are unbounded and each message gives us some information, usually tied to a key. In fact, events by default in Kafka and certainly, in ksqlDB streams are a key and a value. Now we can use streams to build up state, and we're gonna call that state a table. This is a little tricky the first time through. So if this is your first time through this maybe watch this twice or we'll just go, we'll just take our time here. But as each event arrives on that stream, as the input, we capture its value for that given key, it has a key and a value. Now the key likely would be a customer ID here. I'm showing you names on the slide, so they're super easy to see. On the left is the stream, and I'm gonna fill that stream in with some events, on the right is the table. And every time there's a new message in the stream, I'm gonna update the state in the table. But when the same key appears twice in the stream, those are two perfectly appropriate events in the stream, watch what happens in the table just keep your eye out for that so, anyway, right now, Alison is a customer, I've heard she's a super cool lady and she's just been moved to the bronze VIP status personally, I think she's being robbed, strikes me more as platinum type of person to me. But here we have an event on the customer stream you see on the left, that comes from the database, by the way, it's Kafka connect is reading things from a database and producing them into the stream. And now the table that we're creating also has Allison equals bronze. Another customer is created in the source database, Kafka Connect, picks it up, maybe it's doing log-based CDC and it writes it to our stream, our topic. And that event comes through on the customer's topic, and then into the ksqlDB stream, Rick, it turns out as a silver VIP look at Rick he's awfully fancy, silver VIP status customer. And so we have a second entry in the stream, a second event in the stream and a second entry in the table. Why in the table? Well, we haven't seen Rick yet before, he's a new key to us. So he becomes a new entry in the table. At the moment the table looks just like the stream. So you're maybe wondering Tim, you said this was tricky, I'm confused, what are you even doing? Well, let's keep going so well now we get another update. There's there's probably literally a SQL update to the source database, Kafka Connect, picks it up, writes it to the topic, well, it looks like Rick is platinum now. I don't know who he knows, but there you go, he's platinum. And that message is now in the stream because it's a new event, but notice what's happened to the table, the table hasn't gotten a new row cause we already had a Rick row, it's just updated. So you can see the table tells us the current state of these keys in the stream. The stream is like a log of everything that's ever happened. The table is a snapshot of what everybody's current value is right now. So Rick became platinum and there you go. Oh, hey, but it looks like Alison is moving up in the world too, I'm encouraged to see this. We get a new event, she's silver, total of four events in our stream, but still just two in our table. Cause we've only ever seen two keys, Alison and Rick are our whole world. Really everybody we know and do business with is just those two people. We're hoping for some growth later on in our simulated company, but this is what we have now. Oh, hey, speaking of growth, here's Hugh, Hugh comes along, he's gold. That record was inserted into the source database Kafka Connect doing its log base CDC, we imagine, picks up that mutation and produces it into the topic that is the customer stream that you see on the left. But now we need a new row in the table cause we haven't seen that key yet. So now we have a total of three keys in the table. Alright, now we know how to make a table. And we're talking about joins here and we kind of slipped that table in on you because we need a table to join too. What we wanna do is join a stream to a table. And this joining the stream to a table is actually pretty intuitive, this is probably going to click if we talk through this animation here, but I wanna make sure first that you realize we just took a topic basically and turned it into a table. And that makes all the sense in the world. Streams can become tables, tables can become streams. What we did is we took a stream and we made it a table. And now we've got this thing where we can look up customer VIP statuses by name efficiently. So that that table and that table is represented on the top left of the animation you see here. So now below that is our inbound stream of ratings events with sort of an ID, or a name of the customer and the rating itself. We don't have any information like we wanna know what's their status. This is somebody who's platinum, should we really think about what they have to say, or is it just a bronze person, and we can disregard them. Actually, hopefully our customer services a lot better than that, but we're doing the join, so let's talk through it, so if you look at the KSQL query down at the bottom of our little iMac monitor there, it's describing a join, an inner join. And if you know SQL, that is all entirely familiar with the exception of the emit changes thing. Don't worry about that for now, that's a ksqlDB detail. But as those events on the blue ratings stream arrive, we look up the customer in the customer's table and we've left platinum and customers and things in this example, we're looking at sensors and brand names, but we're able to join that event in the stream to the really the entity data in the table. And for those things that have matches, they've changed color to that green and they move on. For those things that don't have matches, you see some of them don't make it because it's an inner join. So those, if they don't find a corresponding record in the table, they don't become a part of the output stream. So it kind of acts like a database join at that point. And it's worth pointing out just as an aside, that all this we're talking about ksqlDB here and all these things ksqlDB does very well and easily and SQL, they also apply to Kafka streams, which is really under the covers, the API, in which ksqlDB is built, so I'm showing you this with ksqlDB very intentionally works very easily in Confluent Cloud, and it's kind of the standalone stream processing language, but you can do this in Kafka streams as well with K streams and K table objects, if you're writing Java applications, that's all doable. Okay, so having seen that animation, let's go back to our users and our model that we were talking about. Here's our data, the source event is ratings information with a foreign key of user ID, so we've got an ID, right, you see that in that JSON, there's a message that needs more peanuts, okay, thanks, one star, sorry we didn't have the kind of peanuts you wanted, but user three, who might that be well up in our customer's table, user three is Joanna Smith. Joanna Smith is platinum, and apparently she really likes peanuts. So we're able now to join those two things with such a query into a combined record, now that has the stars of a rating, her message, her email, her name, her status, all those things you can see there on that slide that are done by that join, those two independent objects, one an event in a stream, one an entry in a table are joined into a new object and produce to a new stream. A stream table joined produces a stream. So that's, it that's joins in ksqlDB, this is a very common thing in a streaming pipeline because you've got some source of input data. There's always something you're gonna wanna enrich it with. Everything's a foreign key, right, you're gonna need to go look something up and enrich that stream with that new lookup table that you've created as we walk through here and produce that enrich result back to a destination topic. So very important skill for you as you build your pipelines.