Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Joins

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.

This module’s code can be found in the source file java/io/confluent/developer/joins/StreamsJoin.java.

This exercise teaches you how to join two streams into a third stream, and then join that third stream with a table.

  1. Use a static helper method to get SerDes for your Avro records (in subsequent exercises, you'll abstract this into a static utility method, in the StreamsUtils class of the course repo):

    static <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde(Map<String, Object> serdeConfig) {
        SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>();
        specificAvroSerde.configure(serdeConfig, false);
        return specificAvroSerde;
    }
  2. Use a utility method to load the properties (you can refer to the StreamsUtils class within the exercise source code):

    Properties streamsProps = StreamsUtils.loadProperties();
  3. Get the input topic names and the output topic name from the properties:

    String streamOneInput = streamsProps.getProperty("stream_one.input.topic");
    String streamTwoInput = streamsProps.getProperty("stream_two.input.topic");
    String tableInput = streamsProps.getProperty("table.input.topic");
    String outputTopic = streamsProps.getProperty("joins.output.topic");
  4. Create a HashMap of the configurations:

    Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
  5. Then create the required SerDes for all streams and for the table:

    SpecificAvroSerde<ApplianceOrder> applianceSerde = getSpecificAvroSerde(configMap);
    SpecificAvroSerde<ElectronicOrder> electronicSerde = getSpecificAvroSerde(configMap);
    SpecificAvroSerde<CombinedOrder> combinedSerde = getSpecificAvroSerde(configMap);
    SpecificAvroSerde<User> userSerde = getSpecificAvroSerde(configMap);
  6. Create the ValueJoiner for the stream-table join:

    ValueJoiner<ApplianceOrder, ElectronicOrder, CombinedOrder> orderJoiner =
        (applianceOrder, electronicOrder) -> CombinedOrder.newBuilder()
            .setApplianceOrderId(applianceOrder.getOrderId())
            .setApplianceId(applianceOrder.getApplianceId())
            .setElectronicOrderId(electronicOrder.getOrderId())
            .setTime(Instant.now().toEpochMilli())
            .build();

    The stream is a result of the preceding stream-stream join, but it's a left outer join, because the right-side record might not exist.

  7. Create the ApplianceOrder stream as well as the ElectronicOrder stream:

    KStream<String, ApplianceOrder> applianceStream =
        builder.stream(streamOneInput, Consumed.with(Serdes.String(), applianceSerde))
            .peek((key, value) -> System.out.println("Appliance stream incoming record key " + key + " value " + value));
    
    KStream<String, ElectronicOrder> electronicStream =
        builder.stream(streamTwoInput, Consumed.with(Serdes.String(), electronicSerde))
            .peek((key, value) -> System.out.println("Electronic stream incoming record " + key + " value " + value));
  8. From here, create the User table:

    KTable<String, User> userTable = builder.table(tableInput, Materialized.with(Serdes.String(), userSerde));
  9. Now create the stream-stream join and call the join method on the applianceStream, the left side (or primary) stream in the join. Add the electronicStream as the right side (or secondary) stream in the join, and add the orderJoiner created before:

    KStream<String, CombinedOrder> combinedStream =
          applianceStream.join(
                electronicStream,
                orderJoiner,
  10. Specify a JoinWindows configuration of 30 minutes (a right-side record must have timestamps within 30 minutes before or after the timestamp of the left side for a join result to occur):

    JoinWindows.of(Duration.ofMinutes(30)),
  11. Add the StreamJoined configuration with SerDes for the key, left-side, and right-side objects, for the joined state stores:

    StreamJoined.with(Serdes.String(), applianceSerde, electronicSerde))
  12. Add a peek operator to view the results of the join:

    .peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));
  13. Call the join method on the KStream that results from the join in previous steps, adding the userTable as the right side in the stream-table join. Then add enrichmentJoiner to add user information, if available. Add the Joined object with SerDes for the values of both sides of the join, add the peek operator to view the stream-table join results, and write the final join results to a topic:

    combinedStream.leftJoin(
                    userTable,
                    enrichmentJoiner,
                    Joined.with(Serdes.String(), combinedSerde, userSerde))
        .peek((key, value) -> System.out.println("Stream-Table Join record key " + key + " value " + value))
        .to(outputTopic, Produced.with(Serdes.String(), combinedSerde));
  14. Create the KafkaStreams object, and again use the TopicLoader helper class to create topics and produce exercise data:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
    TopicLoader.runProducer();
  15. Finally, start the Kafka Streams application:

    kafkaStreams.start();

You run the joins example with this command:

./gradlew runStreams -Pargs=joins

The output for the exercise should like this:

Appliance stream incoming record key 10261998 value {"order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_id": "10261998", "time": 1622148573134}
Electronic stream incoming record 10261999 value {"order_id": "remodel-2", "electronic_id": "laptop-5333", "user_id": "10261999", "price": 0.0, "time": 1622148573146}
Electronic stream incoming record 10261998 value {"order_id": "remodel-1", "electronic_id": "television-2333", "user_id": "10261998", "price": 0.0, "time": 1622148573136}
Stream-Stream Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "", "time": 1622148582747}
Stream-Table Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "Elizabeth Jones", "time": 1622148582747}
Appliance stream incoming record key 10261999 value {"order_id": "remodel-2", "appliance_id": "stove-2333", "user_id": "10261999", "time": 1622148573134}
Stream-Stream Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1622148582853}
Stream-Table Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1622148582853}

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.