Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.
This module’s code can be found in the source file java/io/confluent/developer/time/StreamsTimestampExtractor.java.
In this hands-on exercise, learn how to use a custom TimestampExtractor to drive the behavior of a Kafka Streams application, using timestamps embedded in the records themselves.
Use the following code as a starter for your time exercise code:
public class StreamsTimestampExtractor {
public static void main(String[] args) throws IOException {
Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG,
"extractor-windowed-streams");
StreamsBuilder builder = new StreamsBuilder();
String inputTopic = streamsProps.getProperty("extractor.input.topic");
String outputTopic = streamsProps.getProperty("extractor.output.topic");
Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
SpecificAvroSerde<ElectronicOrder> electronicSerde =
StreamsUtils.getSpecificAvroSerde(configMap);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
kafkaStreams.start();
}
}
Above the main method, create an instance of a TimestampExtractor, implementing the extract method and retrieving the ElectronicOrder object from the ConsumerRecord value field; then extract and return the timestamp embedded in the `ElectronicOrder':
static class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
ElectronicOrder order = (ElectronicOrder)record.value();
System.out.println("Extracting time of " + order.getTime() + " from " + order);
return order.getTime();
}
}
Just above your kafkaStreams instance, create a KStream, and make the familiar builder.stream call:
final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic,
Add the Consumed configuration object with SerDes for deserialization, but with a twist: You're also providing a TimestampExtractor. (You could also specify the TimestampExtractor by configurations, but then it would be global for all streams in the application.)
Consumed.with(Serdes.String(), electronicSerde)
.withTimestampExtractor(new OrderTimestampExtractor()))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
Create a tumbling window aggregation. Keep in mind that the timestamps from ElectronicOrder are what drive the window opening and closing.
electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
Call the aggregate method, initializing the aggregate to "0.0" and adding the aggregator instance that sums all prices for the total spent over one hour, based on the timestamp of the record itself. Add SerDes for the state store via a Materialized, and convert the KTable from the aggregation into a KStream.
.aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double()))
.toStream()
Use a map processor to unwrap the windowed key and return the underlying key of the aggregation, and use a peek processor to print the aggregation results to the console. Finally, write the results out to a topic.
.map((wk, value) -> KeyValue.pair(wk.key(),value))
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
As with the other aggregation applications, let this one run for at least 40 seconds.
To run this example use the following command:
./gradlew runStreams -Pargs=time
Your output will include statements from the TimestampExtractor and it should look something like this:
Extracting time of 1622155705696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622155705696}
Extracting time of 1622156605696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156605696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622155705696}
Extracting time of 1622157505696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622157505696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156605696}
Extracting time of 1622158405696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622158405696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622157505696}
Extracting time of 1622159485696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622159485696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622158405696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622159485696}
Outgoing record - key HDTV-2333 value 2000.0
Outgoing record - key HDTV-2333 value 9167.189999999999
All the required code has already been supplied. Here, we'll focus on the streams code and adding a timestamp extractor, pull the timestamp embedded in the record value payload. Let's kick things off by creating an instance of a timestamp extractor. Implement the extract method. Now get the electronic order object from the consumer record value field. Next, extract and return the timestamp embedded in the electronic order. Now, go ahead and create the KStream and go ahead and make the now familiar build.stream call. Next, add the consumed content object with the SerDes for DC serialization. But with the twist, we're also providing a timestamp extractor. We can also specify the timestamp extractor via configs, but then it's global for all streams in the application. With this approach, it's local to the stream only. Next, add an instance of the timestamp extractor that you've implemented before. Now, create the window aggregation with tumbling windows. Bear in mind that the timestamps from the electronic order are what drive the action in terms of the window opening and closing. Now calling the aggregate method and initialize the aggregation to zero and adding the aggregator instance, that sums all prices for the total spent over one hour based on the timestamp of the record itself. From here add SerDes for the state store, via a materialized and convert the KTable from the aggregation into a KStream. Then you'll want to use a map processor to unwrap the window key and return the underlying key of the aggregation. Next, add a peek processor to print the aggregation results to the console. Finally, write the results out to a topic. As with the other aggregation applications let this one run for at least 40 seconds.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.