Get Started Free
‹ Back to courses
course: Kafka Streams 101

Hands On: Joins

3 min
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Joins

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.

This module’s code can be found in the source file java/io/confluent/developer/joins/StreamsJoin.java.

This exercise teaches you how to join two streams into a third stream, and then join that third stream with a table.

  1. Use a static helper method to get SerDes for your Avro records (in subsequent exercises, you'll abstract this into a static utility method, in the StreamsUtils class of the course repo):

    static <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde(Map<String, Object> serdeConfig) {
        SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>();
        specificAvroSerde.configure(serdeConfig, false);
        return specificAvroSerde;
    }
  2. Use a utility method to load the properties (you can refer to the StreamsUtils class within the exercise source code):

    Properties streamsProps = StreamsUtils.loadProperties();
  3. Get the input topic names and the output topic name from the properties:

    String streamOneInput = streamsProps.getProperty("stream_one.input.topic");
    String streamTwoInput = streamsProps.getProperty("stream_two.input.topic");
    String tableInput = streamsProps.getProperty("table.input.topic");
    String outputTopic = streamsProps.getProperty("joins.output.topic");
  4. Create a HashMap of the configurations:

    Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
  5. Then create the required SerDes for all streams and for the table:

    SpecificAvroSerde<ApplianceOrder> applianceSerde = getSpecificAvroSerde(configMap);
    SpecificAvroSerde<ElectronicOrder> electronicSerde = getSpecificAvroSerde(configMap);
    SpecificAvroSerde<CombinedOrder> combinedSerde = getSpecificAvroSerde(configMap);
    SpecificAvroSerde<User> userSerde = getSpecificAvroSerde(configMap);
  6. Create the ValueJoiner for the stream-table join:

    ValueJoiner<ApplianceOrder, ElectronicOrder, CombinedOrder> orderJoiner =
        (applianceOrder, electronicOrder) -> CombinedOrder.newBuilder()
            .setApplianceOrderId(applianceOrder.getOrderId())
            .setApplianceId(applianceOrder.getApplianceId())
            .setElectronicOrderId(electronicOrder.getOrderId())
            .setTime(Instant.now().toEpochMilli())
            .build();

    The stream is a result of the preceding stream-stream join, but it's a left outer join, because the right-side record might not exist.

  7. Create the ApplianceOrder stream as well as the ElectronicOrder stream:

    KStream<String, ApplianceOrder> applianceStream =
        builder.stream(streamOneInput, Consumed.with(Serdes.String(), applianceSerde))
            .peek((key, value) -> System.out.println("Appliance stream incoming record key " + key + " value " + value));
    
    KStream<String, ElectronicOrder> electronicStream =
        builder.stream(streamTwoInput, Consumed.with(Serdes.String(), electronicSerde))
            .peek((key, value) -> System.out.println("Electronic stream incoming record " + key + " value " + value));
  8. From here, create the User table:

    KTable<String, User> userTable = builder.table(tableInput, Materialized.with(Serdes.String(), userSerde));
  9. Now create the stream-stream join and call the join method on the applianceStream, the left side (or primary) stream in the join. Add the electronicStream as the right side (or secondary) stream in the join, and add the orderJoiner created before:

    KStream<String, CombinedOrder> combinedStream =
          applianceStream.join(
                electronicStream,
                orderJoiner,
  10. Specify a JoinWindows configuration of 30 minutes (a right-side record must have timestamps within 30 minutes before or after the timestamp of the left side for a join result to occur):

    JoinWindows.of(Duration.ofMinutes(30)),
  11. Add the StreamJoined configuration with SerDes for the key, left-side, and right-side objects, for the joined state stores:

    StreamJoined.with(Serdes.String(), applianceSerde, electronicSerde))
  12. Add a peek operator to view the results of the join:

    .peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));
  13. Call the join method on the KStream that results from the join in previous steps, adding the userTable as the right side in the stream-table join. Then add enrichmentJoiner to add user information, if available. Add the Joined object with SerDes for the values of both sides of the join, add the peek operator to view the stream-table join results, and write the final join results to a topic:

    combinedStream.leftJoin(
                    userTable,
                    enrichmentJoiner,
                    Joined.with(Serdes.String(), combinedSerde, userSerde))
        .peek((key, value) -> System.out.println("Stream-Table Join record key " + key + " value " + value))
        .to(outputTopic, Produced.with(Serdes.String(), combinedSerde));
  14. Create the KafkaStreams object, and again use the TopicLoader helper class to create topics and produce exercise data:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
    TopicLoader.runProducer();
  15. Finally, start the Kafka Streams application:

    kafkaStreams.start();

You run the joins example with this command:

./gradlew runStreams -Pargs=joins

The output for the exercise should like this:

Appliance stream incoming record key 10261998 value {"order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_id": "10261998", "time": 1622148573134}
Electronic stream incoming record 10261999 value {"order_id": "remodel-2", "electronic_id": "laptop-5333", "user_id": "10261999", "price": 0.0, "time": 1622148573146}
Electronic stream incoming record 10261998 value {"order_id": "remodel-1", "electronic_id": "television-2333", "user_id": "10261998", "price": 0.0, "time": 1622148573136}
Stream-Stream Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "", "time": 1622148582747}
Stream-Table Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "Elizabeth Jones", "time": 1622148582747}
Appliance stream incoming record key 10261999 value {"order_id": "remodel-2", "appliance_id": "stove-2333", "user_id": "10261999", "time": 1622148573134}
Stream-Stream Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1622148582853}
Stream-Table Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1622148582853}

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Hands On: Joins

In this exercise, we'll use a static helper method for getting SerDes for the Avro records. In subsequent exercises, we'll abstract this to a static utility method in the Udall's class of the course repo. Here, we're using a utility method that is used to load the properties. And you can refer to the Udall's class within the exercise source code. Next, get the input topic names and the output topic name from the properties. After creating a hash map with the configs, let's create the required SerDeys for all streams and the table. Now, create the value joiner for the stream stream join by taking the left side and right side of the join to create a combined order object. Next, create the value joiner for the stream table join. The stream is a result of the preceding stream stream join, but it's a left out join because the right side record might not exist. Then create the appliance order stream as well as the electronic order stream. From here, create the user table. Now you'll create the stream stream join once you've done that call the join method on the appliance stream, the left side or primary stream in the join. Add the electronic stream as the right side or secondary stream in the join. And then add the order joiner created before. Now, specify a join windows of 30 minutes and note that a right-side record must have timestamps within 30 minutes before or after the timestamp of the left side with the same key for a join result to occur. After that, add the stream joined config with SerDes for the key left side and right side objects for join state stores. Now add a peak operator to view the results of the join. The next step is to call the joint method on the key stream, resulting from the joint in previous steps. Next, add the user key table as the right side and the stream table join. Now add the enrichment joiner, which adds easier information if it is available. Now add the joined config object with the SerDes for the values of both sides of the join. Next, use the peak operator again to view the stream table join results. As the last step, right the final join results to a topic. Now create the coffer streams object, and again, use the topic loader helper class to create topics and produce exercise data. And there you have it. You can start at the streams application.