Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Time Concepts

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.

This module’s code can be found in the source file java/io/confluent/developer/time/StreamsTimestampExtractor.java.

In this hands-on exercise, learn how to use a custom TimestampExtractor to drive the behavior of a Kafka Streams application, using timestamps embedded in the records themselves.

  1. Use the following code as a starter for your time exercise code:

    public class StreamsTimestampExtractor {
          public static void main(String[] args) throws IOException {
    
                Properties streamsProps = StreamsUtils.loadProperties();
                streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG,   
    "extractor-windowed-streams");
    
                StreamsBuilder builder = new StreamsBuilder();
                String inputTopic = streamsProps.getProperty("extractor.input.topic");
                String outputTopic = streamsProps.getProperty("extractor.output.topic");
                Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
    
                SpecificAvroSerde<ElectronicOrder> electronicSerde =
                    StreamsUtils.getSpecificAvroSerde(configMap);
    
                KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
                TopicLoader.runProducer();
                kafkaStreams.start();
        }
    }
  2. Above the main method, create an instance of a TimestampExtractor, implementing the extract method and retrieving the ElectronicOrder object from the ConsumerRecord value field; then extract and return the timestamp embedded in the `ElectronicOrder':

    static class OrderTimestampExtractor implements TimestampExtractor {
          @Override
          public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
                ElectronicOrder order = (ElectronicOrder)record.value();
                System.out.println("Extracting time of " + order.getTime() + " from " + order);
                return order.getTime();
            }
        }
  3. Just above your kafkaStreams instance, create a KStream, and make the familiar builder.stream call:

    final KStream<String, ElectronicOrder> electronicStream =
          builder.stream(inputTopic,
  4. Add the Consumed configuration object with SerDes for deserialization, but with a twist: You're also providing a TimestampExtractor. (You could also specify the TimestampExtractor by configurations, but then it would be global for all streams in the application.)

    Consumed.with(Serdes.String(), electronicSerde)
        .withTimestampExtractor(new OrderTimestampExtractor()))
        .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
  5. Create a tumbling window aggregation. Keep in mind that the timestamps from ElectronicOrder are what drive the window opening and closing.

    electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
  6. Call the aggregate method, initializing the aggregate to "0.0" and adding the aggregator instance that sums all prices for the total spent over one hour, based on the timestamp of the record itself. Add SerDes for the state store via a Materialized, and convert the KTable from the aggregation into a KStream.

    .aggregate(() -> 0.0,
                (key, order, total) -> total + order.getPrice(),
                Materialized.with(Serdes.String(), Serdes.Double()))
    .toStream()
  7. Use a map processor to unwrap the windowed key and return the underlying key of the aggregation, and use a peek processor to print the aggregation results to the console. Finally, write the results out to a topic.

    .map((wk, value) -> KeyValue.pair(wk.key(),value))
    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
  8. As with the other aggregation applications, let this one run for at least 40 seconds.

To run this example use the following command:

./gradlew runStreams -Pargs=time

Your output will include statements from the TimestampExtractor and it should look something like this:

Extracting time of 1622155705696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622155705696}
Extracting time of 1622156605696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156605696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622155705696}
Extracting time of 1622157505696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622157505696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156605696}
Extracting time of 1622158405696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622158405696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622157505696}
Extracting time of 1622159485696 from {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622159485696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622158405696}
Incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622159485696}
Outgoing record - key HDTV-2333 value 2000.0
Outgoing record - key HDTV-2333 value 9167.189999999999

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.