Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build
and follow along in the code.
This module’s code can be found in the source file java/io/confluent/developer/time/StreamsTimestampExtractor.java
.
In this hands-on exercise, learn how to use a custom TimestampExtractor
to drive the behavior of a Kafka Streams application, using timestamps embedded in the records themselves.
Use the following code as a starter for your time exercise code:
public class StreamsTimestampExtractor {
public static void main(String[] args) throws IOException {
Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG,
"extractor-windowed-streams");
StreamsBuilder builder = new StreamsBuilder();
String inputTopic = streamsProps.getProperty("extractor.input.topic");
String outputTopic = streamsProps.getProperty("extractor.output.topic");
Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
SpecificAvroSerde<ElectronicOrder> electronicSerde =
StreamsUtils.getSpecificAvroSerde(configMap);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
kafkaStreams.start();
}
}
Above the main
method, create an instance of a TimestampExtractor
, implementing the extract
method and retrieving the ElectronicOrder
object from the ConsumerRecord
value field; then extract and return the timestamp embedded in the `ElectronicOrder':
static class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
ElectronicOrder order = (ElectronicOrder)record.value();
System.out.println("Extracting time of " + order.getTime() + " from " + order);
return order.getTime();
}
}
Just above your kafkaStreams
instance, create a KStream
, and make the familiar builder.stream
call:
final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic,
Add the Consumed
configuration object with SerDes for deserialization, but with a twist: You're also providing a TimestampExtractor
. (You could also specify the TimestampExtractor
by configurations, but then it would be global for all streams in the application.)
Consumed.with(Serdes.String(), electronicSerde)
.withTimestampExtractor(new OrderTimestampExtractor()))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
Create a tumbling window aggregation. Keep in mind that the timestamps from ElectronicOrder
are what drive the window opening and closing.
electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
Call the aggregate
method, initializing the aggregate to "0.0" and adding the aggregator instance that sums all prices for the total spent over one hour, based on the timestamp of the record itself. Add SerDes for the state store via a Materialized
, and convert the KTable from the aggregation into a KStream.
.aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double()))
.toStream()
Use a map
processor to unwrap the windowed key and return the underlying key of the aggregation, and use a peek
processor to print the aggregation results to the console. Finally, write the results out to a topic.
.map((wk, value) -> KeyValue.pair(wk.key(),value))
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
As with the other aggregation applications, let this one run for at least 40 seconds.
To run this example use the following command:
./gradlew runStreams -Pargs=time
Your output will include statements from the TimestampExtractor
and it should look
something like this:
Extracting time of 1622155705696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622155705696}
Extracting time of 1622156605696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156605696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622155705696}
Extracting time of 1622157505696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622157505696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156605696}
Extracting time of 1622158405696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622158405696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622157505696}
Extracting time of 1622159485696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622159485696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622158405696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622159485696}
Outgoing record - key HDTV-2333 value 2000.0
Outgoing record - key HDTV-2333 value 9167.189999999999
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.