Course: Kafka Streams 101

Hands On: Time Concepts

2 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)
Bill BejeckIntegration Architect (Course Author)

Hands On: Time Concepts

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.

This module’s code can be found in the source file java/io/confluent/developer/time/StreamsTimestampExtractor.java.

In this hands-on exercise, learn how to use a custom TimestampExtractor to drive the behavior of a Kafka Streams application, using timestamps embedded in the records themselves.

  1. Use the following code as a starter for your time exercise code:

    public class StreamsTimestampExtractor {
          public static void main(String[] args) throws IOException {
    
                Properties streamsProps = StreamsUtils.loadProperties();
                streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG,   
    "extractor-windowed-streams");
    
                StreamsBuilder builder = new StreamsBuilder();
                String inputTopic = streamsProps.getProperty("extractor.input.topic");
                String outputTopic = streamsProps.getProperty("extractor.output.topic");
                Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
    
                SpecificAvroSerde<ElectronicOrder> electronicSerde =
                    StreamsUtils.getSpecificAvroSerde(configMap);
    
                KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
                TopicLoader.runProducer();
                kafkaStreams.start();
        }
    }
  2. Above the main method, create an instance of a TimestampExtractor, implementing the extract method and retrieving the ElectronicOrder object from the ConsumerRecord value field; then extract and return the timestamp embedded in the `ElectronicOrder':

    static class OrderTimestampExtractor implements TimestampExtractor {
          @Override
          public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
                ElectronicOrder order = (ElectronicOrder)record.value();
                System.out.println("Extracting time of " + order.getTime() + " from " + order);
                return order.getTime();
            }
        }
  3. Just above your kafkaStreams instance, create a KStream, and make the familiar builder.stream call:

    final KStream<String, ElectronicOrder> electronicStream =
          builder.stream(inputTopic,
  4. Add the Consumed configuration object with SerDes for deserialization, but with a twist: You're also providing a TimestampExtractor. (You could also specify the TimestampExtractor by configurations, but then it would be global for all streams in the application.)

    Consumed.with(Serdes.String(), electronicSerde)
        .withTimestampExtractor(new OrderTimestampExtractor()))
        .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
  5. Create a tumbling window aggregation. Keep in mind that the timestamps from ElectronicOrder are what drive the window opening and closing.

    electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
  6. Call the aggregate method, initializing the aggregate to "0.0" and adding the aggregator instance that sums all prices for the total spent over one hour, based on the timestamp of the record itself. Add SerDes for the state store via a Materialized, and convert the KTable from the aggregation into a KStream.

    .aggregate(() -> 0.0,
                (key, order, total) -> total + order.getPrice(),
                Materialized.with(Serdes.String(), Serdes.Double()))
    .toStream()
  7. Use a map processor to unwrap the windowed key and return the underlying key of the aggregation, and use a peek processor to print the aggregation results to the console. Finally, write the results out to a topic.

    .map((wk, value) -> KeyValue.pair(wk.key(),value))
    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
    .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
  8. As with the other aggregation applications, let this one run for at least 40 seconds.

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.