Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build
and follow along in the code.
This module’s code can be found in the source file java/io/confluent/developer/joins/StreamsJoin.java
.
This exercise teaches you how to join two streams into a third stream, and then join that third stream with a table.
Use a static helper method to get SerDes for your Avro records (in subsequent exercises, you'll abstract this into a static utility method, in the StreamsUtils
class of the course repo):
static <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde(Map<String, Object> serdeConfig) {
SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>();
specificAvroSerde.configure(serdeConfig, false);
return specificAvroSerde;
}
Use a utility method to load the properties (you can refer to the StreamsUtils
class within the exercise source code):
Properties streamsProps = StreamsUtils.loadProperties();
Get the input topic names and the output topic name from the properties:
String streamOneInput = streamsProps.getProperty("stream_one.input.topic");
String streamTwoInput = streamsProps.getProperty("stream_two.input.topic");
String tableInput = streamsProps.getProperty("table.input.topic");
String outputTopic = streamsProps.getProperty("joins.output.topic");
Create a HashMap of the configurations:
Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
Then create the required SerDes for all streams and for the table:
SpecificAvroSerde<ApplianceOrder> applianceSerde = getSpecificAvroSerde(configMap);
SpecificAvroSerde<ElectronicOrder> electronicSerde = getSpecificAvroSerde(configMap);
SpecificAvroSerde<CombinedOrder> combinedSerde = getSpecificAvroSerde(configMap);
SpecificAvroSerde<User> userSerde = getSpecificAvroSerde(configMap);
Create the ValueJoiner
for the stream-table join:
ValueJoiner<ApplianceOrder, ElectronicOrder, CombinedOrder> orderJoiner =
(applianceOrder, electronicOrder) -> CombinedOrder.newBuilder()
.setApplianceOrderId(applianceOrder.getOrderId())
.setApplianceId(applianceOrder.getApplianceId())
.setElectronicOrderId(electronicOrder.getOrderId())
.setTime(Instant.now().toEpochMilli())
.build();
The stream is a result of the preceding stream-stream join, but it's a left outer join, because the right-side record might not exist.
Create the ApplianceOrder
stream as well as the ElectronicOrder
stream:
KStream<String, ApplianceOrder> applianceStream =
builder.stream(streamOneInput, Consumed.with(Serdes.String(), applianceSerde))
.peek((key, value) -> System.out.println("Appliance stream incoming record key " + key + " value " + value));
KStream<String, ElectronicOrder> electronicStream =
builder.stream(streamTwoInput, Consumed.with(Serdes.String(), electronicSerde))
.peek((key, value) -> System.out.println("Electronic stream incoming record " + key + " value " + value));
From here, create the User
table:
KTable<String, User> userTable = builder.table(tableInput, Materialized.with(Serdes.String(), userSerde));
Now create the stream-stream join and call the join
method on the applianceStream
, the left side (or primary) stream in the join. Add the electronicStream
as the right side (or secondary) stream in the join, and add the orderJoiner
created before:
KStream<String, CombinedOrder> combinedStream =
applianceStream.join(
electronicStream,
orderJoiner,
Specify a JoinWindows
configuration of 30 minutes (a right-side record must have timestamps within 30 minutes before or after the timestamp of the left side for a join result to occur):
JoinWindows.of(Duration.ofMinutes(30)),
Add the StreamJoined
configuration with SerDes for the key, left-side, and right-side objects, for the joined state stores:
StreamJoined.with(Serdes.String(), applianceSerde, electronicSerde))
Add a peek
operator to view the results of the join:
.peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));
Call the join
method on the KStream that results from the join in previous steps, adding the userTable
as the right side in the stream-table join. Then add enrichmentJoiner
to add user information, if available. Add the Joined
object with SerDes for the values of both sides of the join, add the peek
operator to view the stream-table join results, and write the final join results to a topic:
combinedStream.leftJoin(
userTable,
enrichmentJoiner,
Joined.with(Serdes.String(), combinedSerde, userSerde))
.peek((key, value) -> System.out.println("Stream-Table Join record key " + key + " value " + value))
.to(outputTopic, Produced.with(Serdes.String(), combinedSerde));
Create the KafkaStreams
object, and again use the TopicLoader
helper class to create topics and produce exercise data:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
Finally, start the Kafka Streams application:
kafkaStreams.start();
You run the joins example with this command:
./gradlew runStreams -Pargs=joins
The output for the exercise should like this:
Appliance stream incoming record key 10261998 value {"order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_id": "10261998", "time": 1622148573134}
Electronic stream incoming record 10261999 value {"order_id": "remodel-2", "electronic_id": "laptop-5333", "user_id": "10261999", "price": 0.0, "time": 1622148573146}
Electronic stream incoming record 10261998 value {"order_id": "remodel-1", "electronic_id": "television-2333", "user_id": "10261998", "price": 0.0, "time": 1622148573136}
Stream-Stream Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "", "time": 1622148582747}
Stream-Table Join record key 10261998 value {"electronic_order_id": "remodel-1", "appliance_order_id": "remodel-1", "appliance_id": "dishwasher-1333", "user_name": "Elizabeth Jones", "time": 1622148582747}
Appliance stream incoming record key 10261999 value {"order_id": "remodel-2", "appliance_id": "stove-2333", "user_id": "10261999", "time": 1622148573134}
Stream-Stream Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1622148582853}
Stream-Table Join record key 10261999 value {"electronic_order_id": "remodel-2", "appliance_order_id": "remodel-2", "appliance_id": "stove-2333", "user_name": "", "time": 1622148582853}
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.