Get Started Free
Tutorial

How to join a KStream and a KStream in Kafka Streams

How to join a KStream and a KStream in Kafka Streams

Suppose you have a stream of movies that have been released and a stream of ratings from moviegoers about how entertaining they are. In this tutorial, we'll write a program that joins each rating with content about the movie.

First you'll create a KStream for the recently released movies:

 KStream<Long, Movie> movieStream = builder.stream(MOVIE_INPUT_TOPIC,
                Consumed.with(Serdes.Long(), movieSerde))
        .peek((key, value) -> LOG.info("Incoming movies key[{}] value[{}]", key, value));

Here you've started with a KStream with a peek statement to view the incoming records. We assume that the underlying topic is keyed on the movie ID.

Then you'll create your KStream of ratings:

 KStream<Long, Rating> ratings = builder.stream(RATING_INPUT_TOPIC,
                        Consumed.with(Serdes.Long(), ratingSerde))
                .map((key, rating) -> new KeyValue<>(rating.id(), rating));

We need to have the same ID as the movie stream, so we'll use a KStream.map operator to set the rating ID as the key. The Rating class ID uses the same ID as the movie.

Now you use a ValueJoiner specifying how to construct the joined value of both streams:

public class MovieRatingJoiner implements ValueJoiner<Rating, Movie, RatedMovie> {

  public RatedMovie apply(Rating rating, Movie movie) {
    return new RatedMovie(movie.id(), movie.title(), movie.releaseYear(), rating.rating());
  }
}

You'll also create a JoinWindows instance which specifies the maximum time difference between records to complete the join.

 JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10));

Here, you're using the JoinWindows.ofTimeDifferenceWithNoGrace method which means Kafka Streams will drop any out-of-order records after the window period passes and they won't be available for joining.

Now, you'll put all this together using a KStream.join operation:

 ratings.join(movieStream, joiner, joinWindows, StreamJoined.with(Serdes.Long(),ratingSerde, movieSerde))
        .to(RATED_MOVIES_OUTPUT,
            Produced.with(Serdes.Long(), ratedMovieSerde));

Notice that you're supplying the Serde for the key, the stream value and the value of the other stream via the StreamJoined configuration object.