Community contribution ✨

Emit a final result from a time window

Question:

How can you count the number of messages in a Kafka topic per key over a time window, with a final result that includes late arrivals?

Edit this page

Example use case:

Consider a topic with events that represent sensor warnings (pressure on robotic arms). One warning per time slot is fine, but you don't want to have too many warnings at the same time. In this tutorial, we'll write a program that counts messages from the same sensor and sends a result at the end of the window.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Get Confluent Platform

2

To get started, make a new directory anywhere you’d like for this project:

mkdir window-final-result && cd window-final-result

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
    - broker
    ports:
    - 8081:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

And launch it by running:

docker compose up -d

Initialize the project

3

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "application"
    id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1-SNAPSHOT"


mainClassName = "io.confluent.developer.WindowFinalResult"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }

    tasks.withType(JavaCompile) {
      options.compilerArgs = [
          '-Xlint:deprecation',
          '-Xlint:unchecked',
          '-Werror'
      ]
    }
}

apply plugin: 'com.github.johnrengelman.shadow'

dependencies {
    implementation "org.apache.avro:avro:1.11.1"

    implementation group: 'com.typesafe', name: 'config', version: '1.4.2'
    implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.11'

    implementation group: 'org.apache.kafka', name: 'kafka-streams', version: '3.4.0'
    implementation group: 'io.confluent', name: 'kafka-streams-avro-serde', version: '7.4.0'

    testImplementation "junit:junit:4.13.2"
    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
    testImplementation "com.github.grantwest.eventually:hamcrest-eventually-matchers:0.0.3"

    // helpers
    implementation group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.13', version: '2.0.7'
}

shadowJar {
    archiveBaseName = "kstreams-${rootProject.name}"
    archiveClassifier = ''
}


task createTopics(type: JavaExec) {
    mainClass = 'io.confluent.developer.helper.TopicCreation'
    classpath = sourceSets.main.runtimeClasspath
}

task publishSchemas(type: JavaExec) {
    mainClass = 'io.confluent.developer.helper.SchemaPublication'
    classpath = sourceSets.main.runtimeClasspath
}

task consumeResult(type: JavaExec) {
    mainClass = 'io.confluent.developer.helper.ResultConsumer'
    classpath = sourceSets.main.runtimeClasspath
}

run.dependsOn {
    [createTopics, publishSchemas]
}



test {
    testLogging {
        events "passed", "skipped", "failed"
        exceptionFormat "full"
    }
}

Note: In addition to our main class, this tutorial brings two Java executions responsible for creating the topics and schemas. In a real life application, these may be outside your project.

Create the following Gradle settings file, named settings.gradle for the project:


rootProject.name = 'window-final-result'

Run the following command to obtain the Gradle wrapper:

gradle wrapper

Create a directory for the project resources:

mkdir -p src/main/resources

Add the config file src/main/resources/application.conf to setup your application:

application.id: "final-results-tutorial"
application.id: ${?APP_ID}

bootstrap.servers: "localhost:29092"
bootstrap.servers: ${?BOOTSTRAP_SERVERS}

schema.registry.url: "http://localhost:8081"
schema.registry.url: ${?SCHEMA_REGISTRY_URL}

window {

  size: 10 seconds
  size: ${?WINDOW_SIZE}

  grace.period: 20 seconds
  grace.period: ${?GRACE_PERIOD}
}

# you may play with the pattern, but ALWAYS include the Zone Offset (Z)!
# It is used to create a java.time.ZonedDateTime by parsing the event in the value message
sensor.datetime.pattern: "yyyy-MM-dd'T'HH:mm:ss.Z"

# adapt this part with YOUR preferred or location, It is used to display the result
local.date {
  lang: "fr"
  pattern: "EEE d MMM yyyy" # date only
}

input.topic {

  name: "input-topic"
  name: ${?INPUT_TOPIC}
  partitions: 1
  partitions: ${?INPUT_TOPIC_PARTITIONS}
  replication.factor: 1
  replication.factor: ${?INPUT_TOPIC_REPLICATION}
}

output.topic {

  name: "output-topic"
  name: ${?OUTPUT_TOPIC}
  partitions: 1
  partitions: ${?OUTPUT_TOPIC_PARTITIONS}
  replication.factor: 1
  replication.factor: ${?OUTPUT_TOPIC_REPLICATION}
}

You may want to adapt the blocks local.date.lang and local.date.pattern

This file contains default configs. In production they will be overridden by environment variables.

Add the logging configuration in the file: src/main/resources/logback.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="KSTREAMS" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>
                %yellow(%d{yyyy-MM-dd HH:mm:ss}) %cyan(${HOSTNAME}) %highlight([%p]) %green((%file:%line\)) - %msg%n
            </pattern>
        </encoder>
    </appender>

    <appender name="CONSUMER" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>
                %yellow(%d{yyyy-MM-dd HH:mm:ss}) %highlight([%p]) %magenta((%file:%line\)) - %msg%n
            </pattern>
        </encoder>
    </appender>

    <logger name="io.confluent.developer.helper.ResultConsumer" level="DEBUG" additivity="false">
        <appender-ref ref="CONSUMER" />
    </logger>

    <logger name="io.confluent.developer" level="DEBUG" additivity="false">
        <appender-ref ref="KSTREAMS" />
    </logger>

    <root level="WARN">
        <appender-ref ref="KSTREAMS" />
    </root>

</configuration>

Create a schema for the events

4

Create a directory for the pressure event schemas:

mkdir -p src/main/avro

Then create the following Avro schema file at src/main/avro/pressure-alert.avsc for the publication events:

{
  "type": "record",
  "name": "PressureAlert",
  "namespace": "io.confluent.developer.avro",
  "doc": "Object used for the recipe: Last Window Result",
  "fields": [
    {
      "name": "id",
      "type": "string",
      "doc": "Id of a robotic arm sensor"
    },
    {
      "name": "datetime",
      "type": "string",
      "doc": "Event time of a pressure alert"
    },
    {
      "name": "pressure",
      "type": "int",
      "doc": "Actual pressure level in Pascal (Pa), yeah metric system rules!"
    }
  ]
}

Because this Avro schema is used in the Java code, it needs to compile it. Run the following:

./gradlew build

Add the helper gradle tasks

5

Topic creation and avro schema declaration are often part of an external process. For the sake of clarity in this tutorial, we won’t include these steps as part of the main application, but isolate theme in a dedicated package.

Create a directory for the package helper:

mkdir -p src/main/java/io/confluent/developer/helper

Add the following class at src/main/java/io/confluent/developer/helper/TopicCreation.java package

package io.confluent.developer.helper;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TopicCreation {

    private static final Logger logger = LoggerFactory.getLogger(TopicCreation.class);

    public static void main(String[] args) {

        Config config = ConfigFactory.load();

        Properties properties = new Properties();

        properties.put("bootstrap.servers", config.getString("bootstrap.servers"));

        AdminClient client = AdminClient.create(properties);

        HashMap<String, NewTopic> topics = new HashMap<>();

        topics.put(
                config.getString("input.topic.name"),
                new NewTopic(
                        config.getString("input.topic.name"),
                        config.getNumber("input.topic.partitions").intValue(),
                        config.getNumber("input.topic.replication.factor").shortValue())
        );

        topics.put(
                config.getString("output.topic.name"),
                new NewTopic(
                        config.getString("output.topic.name"),
                        config.getNumber("output.topic.partitions").intValue(),
                        config.getNumber("output.topic.replication.factor").shortValue())
        );

        try {
            logger.info("Starting the topics creation");

            CreateTopicsResult result = client.createTopics(topics.values());

            result.values().forEach((topicName, future) -> {
                NewTopic topic = topics.get(topicName);
                future.whenComplete((aVoid, maybeError) ->
                    {
                        if (maybeError != null) {
                            logger.error("Topic creation didn't complete:", maybeError);
                        } else {
                            logger.info(
                                    String.format(
                                            "Topic %s, has been successfully created " +
                                                    "with %s partitions and replicated %s times",
                                            topic.name(),
                                            topic.numPartitions(),
                                            topic.replicationFactor() - 1
                                    )
                            );
                        }
                    }
                );
            });

            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) e.printStackTrace();
        } finally {
            client.close();
        }
    }
}

Add the following class in the src/main/java/io/confluent/developer/helper/SchemaPublication.java package

package io.confluent.developer.helper;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;

public class SchemaPublication {

    private static final Logger logger = LoggerFactory.getLogger(SchemaPublication.class);

    public static void main(String[] args) {

        Config config = ConfigFactory.load();

        String registryUrl = config.getString("schema.registry.url");

        CachedSchemaRegistryClient schemaRegistryClient  = new CachedSchemaRegistryClient(registryUrl, 10);

        try {
            logger.info(String.format("Schemas publication at: %s", registryUrl));

            schemaRegistryClient.register(
                String.format("%s-value", config.getString("input.topic.name")),
                new AvroSchema(PressureAlert.SCHEMA$)
            );
        } catch (IOException | RestClientException e) {
            e.printStackTrace();
        }
    }
}

Now the topics can be created separately with the following command.

./gradlew createTopics

Same thing for the schemas.

./gradlew publishSchemas

Check the build.gradle again. You will find the tasks declared as JavaExec with a main class corresponding to the two last files

Create the timestamp extractor

6

There are multiple timing perspectives to consider, and each event may arrive from a different time zone.

  1. Event time, time of the sensor that is different rather it comes from Paris (UTC+02:00) or Tokyo (UTC+19:00)

  2. Processing time, the time of the Kafka Stream instances. Here the zone depends of your deployment (e.g., your fancy managed kubernetes cluster deployed in us-west-b :p)

  3. Ingestion time, less relevant, this is the time when the Kafka message has been published

Since our operations will be time based, you need to ensure the right time is considered. In this example, our data producer is not aware of message timestamp and places the time of the alert in the message value. We need to extract it from there. This can be performed by implementing a TimestampExtractor. Add the next class at src/main/java/io/confluent/developer/PressureDatetimeExtractor.java package.

package io.confluent.developer;

import com.typesafe.config.Config;
import io.confluent.developer.avro.PressureAlert;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;


public class PressureDatetimeExtractor implements TimestampExtractor {

    private final DateTimeFormatter formatter;
    private static final Logger logger = LoggerFactory.getLogger(TimestampExtractor.class);

    public PressureDatetimeExtractor(Config config) {
        this.formatter = DateTimeFormatter.ofPattern(config.getString("sensor.datetime.pattern"));
    }

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        try {
            String dateTimeString = ((PressureAlert) record.value()).getDatetime();
            ZonedDateTime zonedDateTime = ZonedDateTime.parse(dateTimeString, this.formatter);
            return zonedDateTime.toInstant().toEpochMilli();
        } catch (ClassCastException cce) {
            logger.error("failed to cast the PressureAlert: ", cce);
        } catch (DateTimeParseException dtpe) {
            logger.error("fail to parse the event datetime due to: ", dtpe);
        }

        // Returning a negative number will cause records to be skipped
        return -1L;
    }
}

Ok, lets translate this extract method from Java to English. First of all, we try to realize the following operation that may raise an exception:

  1. we cast the value Object as PressureAlert and call its .getDatetime method

  2. then we parse the string datetime base on the defined pattern

  3. then we convert it as Instant, in case the kafka message suffer from jet lag

  4. and get the epoch in milliseconds

If one this steps fail we will log the error and set the timestamp to a negative number, so it will silently ignored.

Create the Kafka Streams topology

7

In the main function we create time-based windows with a given size and the same step size. This results in non-overlapping windows called Tumbling Windows. Also we add a extra period were even if messages come late, if their datetime key correspond to a window they may join the window. Finally we pass this window, to a function that takes also StreamsBuilder and return a Topology. Add the next class at src/main/java/io/confluent/developer/WindowFinalResult.java package.

package io.confluent.developer;


import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Properties;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde;

public class WindowFinalResult {

    private static final Logger logger = LoggerFactory.getLogger(WindowFinalResult.class);

    public static Properties buildProperties(Config config) {
        Properties properties = new Properties();

        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("bootstrap.servers"));
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, config.getString("application.id"));
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);

        return properties;
    }

    public static Topology buildTopology(Config config,
                                         TimeWindows windows,
                                         SpecificAvroSerde<PressureAlert> pressureSerde) {

        StreamsBuilder builder = new StreamsBuilder();

        String inputTopic = config.getString("input.topic.name");
        String outputTopic = config.getString("output.topic.name");

        Produced<Windowed<String>, Long> producedCount = Produced
                .with(new TimeWindowedSerde<>(Serdes.String(), Long.MAX_VALUE), Serdes.Long());

        Consumed<String, PressureAlert> consumedPressure = Consumed
                .with(Serdes.String(), pressureSerde)
                .withTimestampExtractor(new PressureDatetimeExtractor(config));

        Grouped<String, PressureAlert> groupedPressure = Grouped.with(Serdes.String(), pressureSerde);

        builder

                .stream(inputTopic, consumedPressure)

                .selectKey((key, value) -> value.getId())

                .groupByKey(groupedPressure)

                .windowedBy(windows)

                .count()

                .suppress(Suppressed.untilWindowCloses(unbounded()))

                .toStream()

                .to(outputTopic, producedCount);

        return builder.build();
    }

    public static void main(String[] args) {

        final Config config = ConfigFactory.load();

        final Properties properties = buildProperties(config);

        Map<String, Object> serdeConfig =
                singletonMap(SCHEMA_REGISTRY_URL_CONFIG, config.getString("schema.registry.url"));

        SpecificAvroSerde<PressureAlert> pressureSerde = new SpecificAvroSerde<>();

        pressureSerde.configure(serdeConfig, false);

        TimeWindows windows = TimeWindows

                .ofSizeAndGrace(config.getDuration("window.size"), config.getDuration("window.grace.period"))

                .advanceBy(config.getDuration("window.size"));

        Topology topology = buildTopology(config, windows, pressureSerde);

        logger.debug(topology.describe().toString());

        final KafkaStreams streams = new KafkaStreams(topology, properties);

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        streams.cleanUp();
        streams.start();
    }
}

Here are several notes about the WindowFinalResult#buildTopology function:

  • To consume events, we create a SpecificAvroSerde based on the generated source code in part 3.

  • The serde used to produce aggregated result is a windowed serde. It will store the key but also the window start time.

  • Our custom timestamp extractor is added thank to the Consumed#withTimestampExtractor method.

Then we stream, selectKey and groupByKey and finally apply the Suppress operator.

The suppress operator will delete every intermediate change once the grace period is over. By doing so it will also emit the final result

Note: even after suppress operator applied, you will need the next event to advance the stream time and get your result.

Compile and run the Kafka Streams program

8

In a new terminal, use the run gradle task to start the main class LastWindowEvent.

./gradlew run

Note: this will apply the topic creation step and schema publication step before running the app.

Alternatively, you may also build a jar archive and run it with a java command. If you do, don’t forget to create the topics first.

./gradlew shadowJar
java -cp build/libs/kstreams-window-final-result*.jar io.confluent.developer.helper.TopicCreation
java -cp build/libs/kstreams-window-final-result*.jar io.confluent.developer.helper.SchemaPublication
java -jar build/libs/kstreams-window-final-result*.jar #-Dconfig.file=./any-other-conf-file.properties
# OR
# APP_ID=LOCAL_DEV_APP_ID java -jar build/libs/kstreams-window-final-result*.jar

Produce events to the input topic

9

Now we want to send sensor events in a more convenient way to focus on our aggregation result.

In a new terminal, define a produce function and tail the content of temporary file.

set +m
function produce () { echo $1 | docker exec -i schema-registry /usr/bin/kafka-avro-console-producer --bootstrap-server broker:9092 --topic input-topic --property value.schema="$(< src/main/avro/pressure-alert.avsc)" & }

Then, we call the function by passing the correct JSON payload.

produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
# {"id":"101","datetime":"2019-09-17T01:22:15.+0200","pressure":30}

Send multiple events

produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"102","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"101","datetime":"'$(date -v-10S +%FT%T.%z)'","pressure":30}' # late of 10 sec
produce '{"id":"101","datetime":"'$(date -v-15S +%FT%T.%z)'","pressure":30}' # late of 15 sec
produce '{"id":"101","datetime":"'$(date -v-60S +%FT%T.%z)'","pressure":30}' # late of 01 min
produce '{"id":"102","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"102","datetime":"'$(date -v-60S +%FT%T.%z)'","pressure":30}' # out of the grace period
export TZ=Asia/Tokyo
produce '{"id":"301","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"301","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"XXX","datetime":"'$(date +%FT%T.%z)'","pressure":30}'

You may also consume the input topic to see what are the sensors dates:

docker exec -it schema-registry /usr/bin/kafka-avro-console-consumer --topic input-topic --bootstrap-server broker:9092 --from-beginning

For example:

# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"102","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:07.+0200","pressure":30} # late
# {"id":"101","datetime":"2019-09-21T05:44:13.+0200","pressure":30} # out of time
# {"id":"102","datetime":"2019-09-21T05:45:13.+0200","pressure":30} # new window
# {"id":"102","datetime":"2019-09-21T05:43:23.+0200","pressure":30} # out of time
# {"id":"301","datetime":"2019-09-21T12:45:23.+0900","pressure":30} # different time zone
# {"id":"301","datetime":"2019-09-21T12:45:24.+0900","pressure":30} # different time zone
# {"id":"XXX","datetime":"2019-09-21T06:00:00.+0200","pressure":30}

Consume events from the output topic

10

Trying to consume the windows start serialised is a bit difficult, so the tutorial comes with a consumer that you can use as a black box to explore the output of the streaming application. In the helper package add the class ResultConsumer:

package io.confluent.developer.helper;


import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.scaladsl.Sink;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Windowed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.UUID;

import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;

public class ResultConsumer {

    private static final Logger logger = LoggerFactory.getLogger(ResultConsumer.class);

    public static void main(String[] args) {

        final Config config = ConfigFactory.load();
        final String outputTopic = config.getString("output.topic.name");

        final ActorSystem system = ActorSystem.create();
        final Materializer materializer = ActorMaterializer.create(system);

        final ConsumerSettings<Windowed<String>, Long> consumerSettings =
                ConsumerSettings
                        .create(
                                system,
                                timeWindowedSerdeFrom(
                                        String.class,
                                        config.getDuration("window.size").toMillis()
                                ).deserializer(),
                                Serdes.Long().deserializer()
                        )
                        .withGroupId(UUID.randomUUID().toString())
                        .withBootstrapServers(config.getString("bootstrap.servers"))
                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Consumer.plainSource(
                consumerSettings,
                Subscriptions.topics(outputTopic))
                .to(Sink.foreach((record) -> {
                            logger.info(printWindowedKey(config, record));
                            return BoxedUnit.UNIT;
                        })
                ).run(materializer);

    }

    private static String printWindowedKey(Config config, ConsumerRecord<Windowed<String>, Long> windowedKeyValue) {

        return String.format("Count = %s for Key = %s, at window [%s-%s] %s (%s)",
                windowedKeyValue.value().toString(),
                windowedKeyValue.key().key(),
                DateTimeFormatter
                        .ofPattern("HH:mm:ss")
                        .withLocale(Locale.getDefault())
                        .withZone(ZoneId.systemDefault())
                        .format(windowedKeyValue.key().window().startTime()),
                DateTimeFormatter
                        .ofPattern("HH:mm:ss")
                        .withLocale(Locale.getDefault())
                        .withZone(ZoneId.systemDefault())
                        .format(windowedKeyValue.key().window().endTime()),
                DateTimeFormatter
                        .ofPattern(config.getString("local.date.pattern"))
                        .withLocale(Locale.forLanguageTag(config.getString("local.date.lang")))
                        .withZone(ZoneId.systemDefault())
                        .format(windowedKeyValue.key().window().startTime()),
                ZoneId.systemDefault().getId()
        );
    }
}

This consumer will only format and log the messages it gets. It also has its own tasks

./gradlew consumeResult

At the end, you should be able to see the output count for the key 101 and the key 102.

2019-09-21 05:46:03 [...] Count = 5 for Key = 101, at window [05:45:00-05:45:10] Sat 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 1 for Key = 102, at window [05:45:00-05:45:10] Sam. 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 1 for Key = 102, at window [05:45:10-05:45:20] Sam. 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 2 for Key = 301, at window [05:45:20-05:45:30] Sam. 21 Sep 2019 (Europe/Paris)

Here the logging time match the time of the latest result: 05:46:03. This latest result for the sensor 000 advance the stream time and a final result gets produced for all window having a terminated grace period. Hours are printed in the default system time zone. So it was between 05:45:20 and 05:45:30 for me when the sensor 301 experienced 2 pressure alerts. To start investigate on what happened, I would need the time zone of that sensor.

tldr;

Test it

Create a test configuration file

1

First, create a directory for the test configuration:

mkdir -p src/test/resources

Then, create a test file configuration named test.properties at src/test/resources:

application.id=final-results-tutorial-test
bootstrap.servers=notused:9092
schema.registry.url=mock://final-results-tutorial-test:8081

window.size=10 seconds
window.grace.period=20 seconds

sensor.datetime.pattern=yyyy-MM-dd'T'HH:mm:ss.Z

local.date.lang=fr
local.date.pattern=EEE d MMM yyyy # date only

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

Write a test

2

Then, create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Create the following test file at src/test/java/io/confluent/developer/WindowFinalResultTest.java:

package io.confluent.developer;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class WindowFinalResultTest {

  private TopologyTestDriver testDriver;
  private TestOutputTopic<Windowed<String>, Long> testOutputTopic;
  private SpecificAvroSerde<PressureAlert> pressureSerde;

  private final Config config = ConfigFactory.load("test.properties");

  private final String inputTopic = this.config.getString("input.topic.name");
  private final String outputTopic = this.config.getString("output.topic.name");

  private final Duration testWindowSize = config.getDuration("window.size");
  private final Duration testGracePeriodSize = config.getDuration("window.grace.period");
  private final Serde<Windowed<String>> keyResultSerde = timeWindowedSerdeFrom(String.class, testWindowSize.toMillis());

  private TimeWindows makeFixedTimeWindow() {
    return TimeWindows.ofSizeAndGrace(testWindowSize,testGracePeriodSize).advanceBy(testWindowSize);
  }

  private SpecificAvroSerde<PressureAlert> makePressureAlertSerde() {

    Map<String, String> schemaRegistryConfigMap = Collections.singletonMap(
        SCHEMA_REGISTRY_URL_CONFIG,
        config.getString(SCHEMA_REGISTRY_URL_CONFIG)
    );

    SpecificAvroSerde<PressureAlert> serde = new SpecificAvroSerde<>();
    serde.configure(schemaRegistryConfigMap, false);

    return serde;
  }

  private List<TestRecord<Windowed<String>, Long>> readAtLeastNOutputs(int size) {
    final List<TestRecord<Windowed<String>, Long>> testRecords = testOutputTopic.readRecordsToList();
    assertThat(testRecords.size(), equalTo(size));

    return testRecords;
  }

  @Before
  public void setUp() {
    this.pressureSerde = makePressureAlertSerde();
    Topology topology = WindowFinalResult.buildTopology(config, makeFixedTimeWindow(), this.pressureSerde);
    this.testDriver = new TopologyTestDriver(topology, WindowFinalResult.buildProperties(config));
    this.testOutputTopic =
        testDriver.createOutputTopic(outputTopic, this.keyResultSerde.deserializer(), Serdes.Long().deserializer());
  }

  @After
  public void tearDown() {
    testDriver.close();
  }

  @Test
  public void topologyShouldGroupOverDatetimeWindows() {
    final TestInputTopic<Bytes, PressureAlert>
        testDriverInputTopic =
        testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());

    List<PressureAlert> inputs = Arrays.asList(
        new PressureAlert("101", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:45:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:45:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:55:10.+0200", Integer.MAX_VALUE),
        // ONE LAST EVENT TO TRIGGER TO MOVE THE STREAMING TIME
        new PressureAlert("XXX", "2019-09-21T05:55:40.+0200", Integer.MAX_VALUE)
    );

    inputs.forEach(pressureAlert ->
                       testDriverInputTopic.pipeInput(null, pressureAlert)
    );

    List<TestRecord<Windowed<String>, Long>> result = readAtLeastNOutputs(3);

    Optional<TestRecord<Windowed<String>, Long>> resultOne = result
        .stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569036600000L).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultTwo = result
        .stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569037500000L).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultThree = result
        .stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569038110000L).findAny();

    assertTrue(resultOne.isPresent());
    assertTrue(resultTwo.isPresent());
    assertTrue(resultThree.isPresent());

    assertEquals(3L, resultOne.get().value().longValue());
    assertEquals(2L, resultTwo.get().value().longValue());
    assertEquals(1L, resultThree.get().value().longValue());

    result.forEach((element) ->
                       assertEquals(
                           makeFixedTimeWindow().size(),
                           element.key().window().end() - element.key().window().start()
                       )
    );
  }

  @Test
  public void topologyShouldGroupById() {

    final TestInputTopic<Bytes, PressureAlert>
        testDriverInputTopic =
        testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());

    List<PressureAlert> inputs = Arrays.asList(
        new PressureAlert("101", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("102", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("102", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("102", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("103", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("103", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("103", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        // ONE LAST EVENT TO TRIGGER TO MOVE THE STREAMING TIME
        new PressureAlert("XXX", "2019-09-21T05:55:41.+0200", Integer.MAX_VALUE)
    );

    inputs.forEach(pressureAlert ->
                       testDriverInputTopic.pipeInput(null, pressureAlert)
    );

    List<TestRecord<Windowed<String>, Long>> result = readAtLeastNOutputs(3);

    Optional<TestRecord<Windowed<String>, Long>> resultOne =
        result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("101")).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultTwo =
        result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("102")).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultThree =
        result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("103")).findAny();

    assertTrue(resultOne.isPresent());
    assertTrue(resultTwo.isPresent());
    assertTrue(resultThree.isPresent());

    assertEquals(3L, resultOne.get().value().longValue());
    assertEquals(3L, resultTwo.get().value().longValue());
    assertEquals(3L, resultThree.get().value().longValue());

    //Assert.assertNull(readNext());
  }
}

This class tests the following things:

  1. The topology groups element over the datetime property

  2. The topology outputs a message for each window

  3. The topology outputs the correct count

  4. Duration between the window start and end corresponds to the window passed in argument

  5. The topology also uses the id property of the sensors to group events

  6. The topology only outputs one element per window

Additionally, a separate test for the timestamp extractor can create at src/test/java/io/confluent/developer/PressureDatetimeExtractorTest.java:

package io.confluent.developer;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static java.util.Collections.singletonMap;

public class PressureDatetimeExtractorTest {

    private TopologyTestDriver testDriver;
    private SpecificAvroSerde<PressureAlert> pressureSerde;

    private final Config config = ConfigFactory.load("test.properties");

    private final String inputTopic = this.config.getString("input.topic.name");
    private final String outputTopic = this.config.getString("output.topic.name");

    private final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
            .appendPattern(this.config.getString("sensor.datetime.pattern"))
            .toFormatter();

    private final PressureDatetimeExtractor timestampExtractor = new PressureDatetimeExtractor(config);
    private TestOutputTopic<String, PressureAlert> testDriverOutputTopic;

    private SpecificAvroSerde<PressureAlert> makePressureAlertSerde() {

        Map<String, String> schemaRegistryConfigMap = singletonMap(
            SCHEMA_REGISTRY_URL_CONFIG,
            config.getString(SCHEMA_REGISTRY_URL_CONFIG)
        );

        SpecificAvroSerde<PressureAlert> serde = new SpecificAvroSerde<>();
        serde.configure(schemaRegistryConfigMap, false);
        return serde;
    }

    private List<TestRecord<String, PressureAlert>> readNOutputs(int size) {
        return testDriverOutputTopic.readRecordsToList();
    }

    @Before
    public void setUp() {
        this.pressureSerde = makePressureAlertSerde();

        Consumed<String, PressureAlert> consumedPressure =
            Consumed.with(Serdes.String(), pressureSerde)
                .withTimestampExtractor(timestampExtractor);

        Produced<String, PressureAlert> producedPressure =
            Produced.with(Serdes.String(), pressureSerde);

        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(this.inputTopic, consumedPressure).to(this.outputTopic, producedPressure);

        this.testDriver = new TopologyTestDriver(builder.build(), WindowFinalResult.buildProperties(config));
        this.testDriverOutputTopic =
            testDriver
                .createOutputTopic(this.outputTopic, Serdes.String().deserializer(), this.pressureSerde.deserializer());
    }

    @After
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void extract() {

        final TestInputTopic<Bytes, PressureAlert>
            testDriverInputTopic =
            testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());
        List<PressureAlert> inputs = Arrays.asList(
                new PressureAlert("101", "2019-09-21T05:25:01.+0200", Integer.MAX_VALUE),
                new PressureAlert("102", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
                new PressureAlert("103", "2019-09-21T05:45:03.+0200", Integer.MAX_VALUE),
                new PressureAlert("104", "DEFINITELY-NOT-PARSABLE!!", Integer.MAX_VALUE),
                new PressureAlert("105", "1500-06-24T09:11:03.+0200", Integer.MAX_VALUE)
        );

        inputs.forEach(pressureAlert ->
                           testDriverInputTopic.pipeInput(null, pressureAlert)
        );

        List<TestRecord<String, PressureAlert>> result = readNOutputs(5);

        Optional<TestRecord<String, PressureAlert>> resultOne =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("101")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultTwo =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("102")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultThree =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("103")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultFour =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("104")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultFive =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("105")).findFirst();

        Assert.assertTrue(resultOne.isPresent());
        Assert.assertTrue(resultTwo.isPresent());
        Assert.assertTrue(resultThree.isPresent());
        Assert.assertFalse(resultFour.isPresent());
        Assert.assertFalse(resultFive.isPresent());

        Assert.assertEquals(
                formatter.parse("2019-09-21T05:25:01.+0200", Instant::from).toEpochMilli(),
                resultOne.get().timestamp().longValue()
        );

        Assert.assertEquals(
                formatter.parse("2019-09-21T05:30:02.+0200", Instant::from).toEpochMilli(),
                resultTwo.get().timestamp().longValue()
        );

        Assert.assertEquals(
                formatter.parse("2019-09-21T05:45:03.+0200", Instant::from).toEpochMilli(),
                resultThree.get().timestamp().longValue()
        );
    }
}

Invoke the tests

3

Now run the test, which is as simple as:

./gradlew test

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.