Senior Software Engineer (Presenter)
Integration Architect (Author)
Kafka Streams provides join operations for streams and tables, enabling you to augment one dataset with another.
Stream-stream joins combine two event streams into a new stream. The streams are joined based on a common key, so keys are necessary. You define a time window, and records on either side of the join need to arrive within the defined window. Kafka Streams uses a state store under the hood to buffer records, so that when a record arrives, it can look in the other stream's state store and find records by key that fit into the time window, based on timestamps.
To enact a join, you use a ValueJoiner, an interface that takes the value from the left side, the primary part of the join, and the value from the right side, the secondary join operator. You use this to compute a new value, which potentially could be of a new type (you decide what the return type will be). You can also use keys in a read-only fashion to compute the new value.
If both sides are available during the window, a join is emitted. Thus, if the left side has a record but the right side doesn't, nothing is emitted.
With an outer join, both sides always produce an output record. Within the join window, if both the left side and the right side are available, a join of the two is returned. If only the left side is available, the join will have the value of the left side and a null for the right side. The converse is true: If only the right side is available, the join will include the value of the right side and a null for the left side.
Left-outer joins also always produce output records. If both sides are available, the join consists of both sides. Otherwise, the left side will be returned with a null for the right side.
You learned above that stream-stream joins are windowed joins. Conversely, the types available for stream-table joins are non-windowed. You can join a KStream with a KTable and a KStream with a GlobalKTable.
In a stream-table join, the stream is always on the primary side; it drives the join. Only new records arriving to the stream result in an output, and new records arriving to the table do not (this is the case for both KStream-KTable and KStream-GlobalKTable joins).
An inner join only fires if both sides are available.
The KStream always produces a record, either a combination of the left and right values, or the left value and a null representing the right side.
With a GlobalKTable, you get full replication of the underlying topic across all instances, as opposed to sharding. GlobalKTable provides a mechanism whereby, when you perform a join with a KStream, the key of the stream doesn't have to match. You get a KeyValueMapper when you define the join, and you can derive the key to match the GlobalKTable key using the stream key and/or value.
Yet another difference between a KTable join and a GlobalKTable join is the fact that a KTable uses timestamps. With a GlobalKTable, when there is an update to the underlying topic, the update is just automatically applied. It's divorced completely from the time mechanism within Kafka Streams. (In contrast, with a KTable, timestamps are part of your event stream processing.) This means that a GlobalKTable is suited for mostly static lookup data. For example, you might use it to hold relatively static user information for matching against transactions.
You can also join a KTable with a KTable. Note that you can only join a GlobalKTable with a KStream.
A ValueJoiner takes left and right values and returns a new value:
KStream<String, String> leftStream = builder.stream("topic-A");
KStream<String, String> rightStream = builder.stream("topic-B");
ValueJoiner<String, String, String> valueJoiner = (leftValue, rightValue) -> {
return leftValue + rightValue;
};
leftStream.join(rightStream,
valueJoiner,
JoinWindows.of(Duration.ofSeconds(10)));
You can return a completely new object type altogether—the return value type doesn't have to be the same as the two values coming in. Notice also the JoinWindows argument, which states that an event on the right side needs to have a timestamp either 10 seconds before the left-stream side or 10 seconds after the left-stream side.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Sophie Blee-Goldman, with Confluent. And we're going to talk about joins. Kafka Streams offers join operations. Now these are things you might use to join two or more streams together into a third stream or table, as we'll get into. Kafka Streams joins require that the records being joined have the same key. So two events with unrelated keys are likely to be unrelated and thus will not be joined. Whereas a join might be something like customer purchases being matched with customer addresses and producing an output stream of the full information set that you have about a customer. Now you wouldn't wanna join one customer's data with that of another customer, that's why joins are useful when they have the same key. Now, Kafka Streams offers three types of joins. First, there is the stream-stream join. In this case, you would have two event streams which are being joined into a new event stream. Now a stream-stream join is what's called a windowed join. The records that arrive are joined with other records of that same key within a defined window of time. So record comes in from the left side, it will be joined only with records from the right side if they are within this defined window of time, either in the forward direction or in the back. And all this data is stored in a local state store to keep track of what data has arrived during this time. Now it's possible to change the value type, or you can produce the same value type, the join could be anything that you want as long as the keys are the same. And the keys themselves are only gonna be available in read only mode so that they can be used to compute the new value, but you can't modify them since they are, of course, essential for how you join this data. So the second type of join that Kafka Streams offers is the stream-table join. Now the stream-table join is not windowed like the stream-stream join was. Which means that anytime you get an event from the stream side, it's going to be joined with whatever the latest value for the table is, and that will be producing the output record. And this makes sense because the table, of course, is only representing the latest value for the record, whereas the stream is an actual individual event. And within the stream-table join, there's actually two different types of joins. There's the KStream and KTable join, which is just the normal join based on whatever data there is for that key and that partition, and there's also a KStream-GlobalKTable join. So as we covered earlier, GlobalKTable is a little bit different. It's generally a static table of information like the mapping of addresses to country codes or zip codes or something like that, something that doesn't really change. Now, you can enrich that data in your Kstream with the information in the GlobalKTable, and that is when a Kstream-GlobalKTable join is useful. Lastly, the third tip of join that Kafka Streams offers is the table-table join. So again, the table-table join, like the stream-table join is not windowed. And unlike the previous two join types, the table-table join is going to result in another table. So unlike the case KStream-KStream or KStream-KTable joins, the table is only ever dealing with table records, which are updates, they reflect the latest value of this events or this record with that key, and thus the output of this table-table join. It makes sense that it would only be reflecting also a table, which represents the latest value for these keys. So digging a little bit deeper, within each join type that Kafka offers, there's a few different types of join. There is the inner join, the outer join and the left outer. These might be familiar from other database type operations. The inner join is available on the stream-stream join, and it basically means only produce an output record if both sides had a record available within that defined window. So when a record comes in on the left side for a stream-stream join, it would look up in the state store for the right side, whether there's another record already within that window. If so, it's joined together and result in an output, and if not, nothing is output. Now, likewise the same would happen if a record came in on the right side. So that's an inner join. An outer join is really the opposite, where both sides will always produce an output record, no matter what is happening on the other side. So if something comes in on the left side and there is already a record on the right side, then these will be joined and output as usual, but if the left side produces a record and there is no matching record with that key on the right side, then you're still going to get an output. It's just going to be the left value plus null. And likewise, if you get an input value on the right side and there's no matching record on the left, then you get null plus the right value in your output record. Now lastly, there's a left outer join. This is kind of the in-between of the inner and the outer join, where only the left side will always produce an output record. Now the left side is just arbitrary. So you can decide what stream goes on the left and which stream goes on the right, that's just a matter of semantics that you defined. But the join itself is only going to produce an output if the left side has a value. So if a left record comes in, regardless of whether or not there is a matching value on the right side, an output will be produced. Whereas if an event comes in on the right side, there is only gonna be an output produced if something matches on the left side. Now, next we have a stream-table join. The stream table join only defines an inner and a left outer join, there's no fully outer join on the stream-table join. It's also non-windowed as we discussed before. And this means that only the stream side really drives the join. So only when new records come in on the stream side, do we get output. And this is why there's only a left outer or an inner join. It just does not make sense for a new record to be output if you only get a new record from the right side. Now all this applies to both the KTable and the GlobalKTable joins, but the GlobalKTable provides a mechanism for determining the join key from the stream side key and or value. So the GlobalKTable is useful because you don't actually have to make sure that the keys are matched ahead of time, you can actually just look up for any key that you want in the GlobalKTable and join that with the stream side key or event. And one final key difference between the normal KTable and the GlobalKTable join is that GlobalKTables are bootstrapped, which means that you read all of the events from the topic as soon as they occur into the GlobalKTable, whereas with the KTable, the events and the join itself is timestamped driven. So events in the KTable with a higher timestamp, then events in the Kstream are not going to be joined with those earlier events. They really are only applied to the KTable once time has advanced to that point. Whereas with the GlobalKTable, everything is considered to be just static information and thus the GlobalKTable, all information will be bootstrapped in ahead of time. So the joins metrics are a little bit different there. That's what a join is. Now let's go through some examples. So to define a stream-stream join, you need to have both a left stream and a right stream. So you would use your builder from before to create a stream from topic A and a stream from topic B. Now, before you join them, you need to create what's called a ValueJoiner. And as the name suggests, this just tells you how do you combine the left value and the right value into the value of the output? So you can create this just like any other, in this case, you would have just some string and you would append the left and the right values together, and that is your ValueJoiner, which just defines what the join itself is. And to do the actual join, then you just pass in the ValueJoiner, and then to do the join itself, you just call join on one of the streams, the left one, whichever that one is chosen to be, and then you pass in the other stream, pass in the ValueJoiner, and lastly, you pass in this JoinWindows. So what are the JoinWindows? This is just, how do you define what that window duration is for the stream? This is the amount of time that two events can differ between the left side and the right side for them to still be matched in this join. In this case, this would be a duration of 10 seconds. So that's joins and Kafka Streams. Now let's check out an example in this exercise.