Course: Kafka Streams 101

Joins

9 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)
Bill BejeckIntegration Architect (Course Author)

Joins

Kafka Streams provides join operations for streams and tables, enabling you to augment one dataset with another.

Stream-Stream

Stream-stream joins combine two event streams into a new stream. The streams are joined based on a common key, so keys are necessary. You define a time window, and records on either side of the join need to arrive within the defined window. Kafka Streams uses a state store under the hood to buffer records, so that when a record arrives, it can look in the other stream's state store and find records by key that fit into the time window, based on timestamps.

To enact a join, you use a ValueJoiner, an interface that takes the value from the left side, the primary part of the join, and the value from the right side, the secondary join operator. You use this to compute a new value, which potentially could be of a new type (you decide what the return type will be). You can also use keys in a read-only fashion to compute the new value.

Inner Joins

If both sides are available during the window, a join is emitted. Thus, if the left side has a record but the right side doesn't, nothing is emitted.

Outer Joins

With an outer join, both sides always produce an output record. Within the join window, if both the left side and the right side are available, a join of the two is returned. If only the left side is available, the join will have the value of the left side and a null for the right side. The converse is true: If only the right side is available, the join will include the value of the right side and a null for the left side.

Left-Outer Joins

Left-outer joins also always produce output records. If both sides are available, the join consists of both sides. Otherwise, the left side will be returned with a null for the right side.

Stream-Table

You learned above that stream-stream joins are windowed joins. Conversely, the types available for stream-table joins are non-windowed. You can join a KStream with a KTable and a KStream with a GlobalKTable.

In a stream-table join, the stream is always on the primary side; it drives the join. Only new records arriving to the stream result in an output, and new records arriving to the table do not (this is the case for both KStream-KTable and KStream-GlobalKTable joins).

Inner

An inner join only fires if both sides are available.

Left Outer

The KStream always produces a record, either a combination of the left and right values, or the left value and a null representing the right side.

GlobalKTable-Stream Join Properties

With a GlobalKTable, you get full replication of the underlying topic across all instances, as opposed to sharding. GlobalKTable provides a mechanism whereby, when you perform a join with a KStream, the key of the stream doesn't have to match. You get a KeyValueMapper when you define the join, and you can derive the key to match the GlobalKTable key using the stream key and/or value.

Yet another difference between a KTable join and a GlobalKTable join is the fact that a KTable uses timestamps. With a GlobalKTable, when there is an update to the underlying topic, the update is just automatically applied. It's divorced completely from the time mechanism within Kafka Streams. (In contrast, with a KTable, timestamps are part of your event stream processing.) This means that a GlobalKTable is suited for mostly static lookup data. For example, you might use it to hold relatively static user information for matching against transactions.

Table-Table

You can also join a KTable with a KTable. Note that you can only join a GlobalKTable with a KStream.

A Code Example: Joining Two Streams

A ValueJoiner takes left and right values and returns a new value:

KStream<String, String> leftStream = builder.stream("topic-A");
KStream<String, String> rightStream = builder.stream("topic-B");

ValueJoiner<String, String, String> valueJoiner = (leftValue, rightValue) -> {
    return leftValue + rightValue;
};
leftStream.join(rightStream, 
                valueJoiner, 
                JoinWindows.of(Duration.ofSeconds(10)));

You can return a completely new object type altogether—the return value type doesn't have to be the same as the two values coming in. Notice also the JoinWindows argument, which states that an event on the right side needs to have a timestamp either 10 seconds before the left-stream side or 10 seconds after the left-stream side.

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.