How to implement TTL-based cleanup to expire data in a KTable created via aggregate

Question:

How can you periodically expire KTable records in the middle of a topology created using `aggregate()` or other stateful functions?

Edit this page

Example use case:

You have a KStreams application which creates KTables in the middle of a topology, using the `aggregate()` function, and the state store grows over time. An example of this type of KTable usage is in a workflow application scenario, where `aggregate()` is used to handle a fixed set of updates to a certain workflow instance. Once the workflow is completed, this data is no longer needed in the state store. In other data aggregation use cases, you may simply want to remove older data to keep the state store size manageable. Although the Kafka Streams API does not natively include a notion of a TTL (Time To Live) for KTables, this tutorial shows you a way to manage the size of state stores that underlie KTables built on `aggregate()` steps.

In a previous tutorial, we showed how to purge KTables which are created directly from an input topic. See Expire data in a KTable with TTLs.

The example shows a fixed TTL per key, based on the last update for that key. This may not serve all needs, but it is sufficient to illustrate the mechanism by which we can purge data from a KTable. The transformer uses a state store to store the last updated time seen for a key, and periodically (via a punctuator) scans its list of keys to see if any of them have exceeded a configured cutoff period (TTL). If they have met the condition for purging, then a signal (via a wrapper object) is sent onward in the pipeline to the downstream `aggregate()` function, so that the key is removed from the KTable's own internal store. This is because the `groupBy()` API will swallow a null key or null value.

For a refresh on scheduling logic using a punctuator, have a look at the Scheduling Operations tutorial.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir schedule-ktable-ttl-aggregate && cd schedule-ktable-ttl-aggregate

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

And launch it by running:

docker compose up -d

Configure the project

4

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"

    id "idea"
    id "eclipse"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation "org.apache.kafka:kafka-streams:3.1.0"
    implementation "io.confluent:common-utils:7.3.3"
    implementation "org.apache.kafka:kafka-clients:3.1.0"
    implementation "com.fasterxml.jackson.core:jackson-core:2.14.2"
    implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.2"
    implementation "com.fasterxml.jackson.core:jackson-databind:2.14.2"

    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.1.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'

}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaStreamsKTableTTLExample"
    )
  }
}

shadowJar {
    archiveBaseName = "schedule-ktable-ttl-aggregate-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=schedule-ktable-ttl-aggregate
bootstrap.servers=localhost:29092
auto.offset.reset=earliest
state.dir=/tmp/confluent/schedule-ktable-ttl-aggregate

input.topic.name=inputTopicForStream
input.topic.partitions=1
input.topic.replication.factor=1

table.topic.name=inputTopicForTable
table.topic.partitions=1
table.topic.replication.factor=1
table.topic.ttl.store.name=table-purge-store
table.topic.ttl.minutes=1
table.topic.ttl.scan.seconds=5

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer
mkdir -p src/main/java/io/confluent/developer/serdes

Create the Kafka Streams topology

5

Now let’s take a look at the components and the approach of the solution.

For context your application has a simple KStream-KTable join where the output of the join is a trivial concatenation of the left side and the right side if the the associated key exists in the KTable. The goal is to purge data in the KTable for which updates have not arrived within a TTL of 1 minute.

The following detailed sections are already included in the application file, we’re just taking a detailed step through the code before you create it.

Let’s look at the TTL Emitter transformer that will schedule emitting tombstones past a certain time:

TTLKTableTombstoneEmitter transformer punctuator
  public TTLKTableTombstoneEmitter(final Duration maxAge, final Duration scanFrequency,
      final String stateStoreName) { (1)
    this.maxAge = maxAge;
    this.scanFrequency = scanFrequency;
    this.purgeStoreName = stateStoreName;
  }

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
    // This is where the delete signal is created. This HAS to use Wallcock Time
    // as the we may not updates to the table's input topic
    context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, timestamp -> { (2)
      final long cutoff = timestamp - maxAge.toMillis();

      // scan over all the keys in this partition's store
      // this can be optimized, but just keeping it simple.
      // this might take a while, so the Streams timeouts should take this into account
      try (final KeyValueIterator<K, Long> all = stateStore.all()) {
        while (all.hasNext()) {
          final KeyValue<K, Long> record = all.next();
          System.out.println("RECORD "+record.key+":"+record.value);

          if (record.value != null && record.value < cutoff) {

            System.out.println("Forwarding Null for key "+record.key);
            // if a record's last update was older than our cutoff, emit a tombstone.
            ValueWrapper vw = new ValueWrapper();
            vw.setDeleted(true);
            context.forward(record.key, vw); (3)
            stateStore.delete(record.key); (4)
          }
        }
      }
    });
  }

  @Override
  public R transform(K key, V value) { (5)

    if (value == null) { (6)
      System.out.println("CLEANING key="+key);
      stateStore.delete(key);
    } else {
      System.out.println("UPDATING key="+key);
      stateStore.put(key, context.timestamp());
    }
    return null;
  }
1 Initialize the transformer with maximum age, scan frequency, and state store name
2 Schedule the operation to (according to wallclock time in this case) to scan all records and pick out which one exceeded TTL.
3 Forward a ValueWrapper object with "deleted" flag set for keys that have not been updated within the maximum age. This is because null values are not passed to aggregate()
4 Remove this key from the state store
5 We still need to create a transform() method to handle incoming changes to the input topic upstream of the KTable
6 Handle tombstones coming from upstream or update the timestamp in the local purge state store

Next, we set up a simple topology showing the KTable created out of an aggregate function (this just keeps values in a list) and then join the generated KTable with the KStream. Here, we have to attach the TTLKTableTombstoneEmitter before the groupBy() and aggregate() steps.

Initializing two KStreams in Kafka Streams application and a state store to manage key-timestamp pairs.
    // Read the input data.
    final KStream<String, String> stream =
        builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));

    final KStream<String, String> stream2 =
        builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()));

   // adding a custom state store for the TTL transformer which has a key of type string, and a
    // value of type long
    // which represents the timestamp
    final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder( (1)
        Stores.persistentKeyValueStore(STATE_STORE_NAME), Serdes.String(), Serdes.Long());

    builder.addStateStore(storeBuilder);
Attaching a transformer to the input stream that is then later used to create a KTable via aggregate()
    KTable<String, AggregateObject> table = stream2
        .transform(() -> new TTLKTableTombstoneEmitter<String, String, KeyValue<String, ValueWrapper>>(MAX_AGE, (1)
            SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
        .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<ValueWrapper>()))
        .aggregate(AggregateObject::new, (key, value, aggregate) -> {
          System.out.println("aggregate() - value=" + value);
          if (value.isDeleted())
            return null; // signal to tombstone this key in KTable
          else
            return aggregate.add((String) value.getValue());
        }, Materialized.<String, AggregateObject, KeyValueStore<Bytes, byte[]>>as("eventstore")
            .withKeySerde(Serdes.String()).withValueSerde(new JSONSerde<AggregateObject>()));

     final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
      System.out.println("JOINING left=" + left + " right=" + right);
      if (right != null) {
        int size = right.getValues().size();
        return left + " " + right.getValues().get(size - 1); // concat the last value out of the aggregate
      }
      return left;
    });
1 Attach the TTLKTableTombstoneEmitter transformer to the stream before the groupBy

We need to create some supporting classes first.

Let’s create a ValueWrapper class used to signal a delete by copying the following file to src/main/java/io/confluent/developer/ValueWrapper.java:

package io.confluent.developer;

import io.confluent.developer.serdes.JSONSerdeCompatible;

public class ValueWrapper extends JSONSerdeCompatible {

  Object value;
  boolean deleted = false;

  public ValueWrapper() {}

  public ValueWrapper(Object value) {
    this.value = value;
  }

  public Object getValue() {
    return value;
  }

  public void setValue(Object value) {
    this.value = value;
  }

  public boolean isDeleted() {
    return deleted;
  }

  public void setDeleted(boolean deleted) {
    this.deleted = deleted;
  }



}

Create the AggregateObject class by copying the following file to src/main/java/io/confluent/developer/AggregateObject.java:

package io.confluent.developer;

import java.util.ArrayList;
import java.util.List;
import io.confluent.developer.serdes.JSONSerdeCompatible;

public class AggregateObject extends JSONSerdeCompatible {

  public List<String> values;

  public AggregateObject() {
    values = new ArrayList<>();
  }

  public AggregateObject add(String v) {
    values.add(v);
    return this;
  }

  public List<String> getValues() {
    return values;
  }

  @Override
  public String toString() {
    return "AggregateObject [values=" + values + "]";
  }


}

Create the JSONSerdeCompatible class by copying the following file to src/main/java/io/confluent/developer/serdes/JSONSerdeCompatible.java:

package io.confluent.developer.serdes;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.confluent.developer.AggregateObject;
import io.confluent.developer.ValueWrapper;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
    use = JsonTypeInfo.Id.NAME,
    include = JsonTypeInfo.As.PROPERTY,
    property = "type")
@JsonSubTypes({
  @JsonSubTypes.Type(value = AggregateObject.class, name = "aggregateObject"),
  @JsonSubTypes.Type(value = ValueWrapper.class, name = "valueWrapper")
})
public abstract class JSONSerdeCompatible {

}

Create the JSONSerde class by copying the following file to src/main/java/io/confluent/developer/serdes/JSONSerde.java:

package io.confluent.developer.serdes;

import java.io.IOException;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONSerde<T> implements Serializer<T>, Deserializer<T>, Serde<T>  {
  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

  @Override
  public Serializer<T> serializer() {
    return this;
  }

  @Override
  public Deserializer<T> deserializer() {
    return this;
  }

  @Override
  public T deserialize(String topic, byte[] data) {
    if(data == null) return null;
    T result;
    try {
      result = (T) OBJECT_MAPPER.readValue(data, JSONSerdeCompatible.class);
      System.out.println("Serialized = "+result);
    } catch (IOException e) {
      throw new SerializationException(e);
    }
    return result;

  }

  @Override
  public byte[] serialize(String topic, T data) {
    if(data == null) return null;
    try {
      return OBJECT_MAPPER.writeValueAsBytes(data);
    } catch (Exception e) {
      throw new SerializationException("Error serializing JSON message", e);
    }
  }

  @Override
  public void close() {

  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {

  }

}

Create the TTLKTableTombstoneEmitter class by copying the following file to src/main/java/io/confluent/developer/TTLKTableTombstoneEmitter.java:

package io.confluent.developer;

import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

/**
 * A simple transformer maintaining a purge store of keys and the
 * last update time and if a TTL has been exceeded, emits tombstones
 * for those keys
 *
 * @author sarwar
 *
 * @param <K>
 * @param <V>
 * @param <R>
 */
public class TTLKTableTombstoneEmitter<K, V, R> implements Transformer<K, V, R> {

  private final Duration maxAge;
  private final Duration scanFrequency;
  private final String purgeStoreName;
  private ProcessorContext context;
  private KeyValueStore<K, Long> stateStore;


  public TTLKTableTombstoneEmitter(final Duration maxAge, final Duration scanFrequency,
      final String stateStoreName) {
    this.maxAge = maxAge;
    this.scanFrequency = scanFrequency;
    this.purgeStoreName = stateStoreName;
  }

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
    // This is where the delete signal is created. This HAS to use Wallcock Time
    // as the we may not updates to the table's input topic
    context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
      final long cutoff = timestamp - maxAge.toMillis();

      // scan over all the keys in this partition's store
      // this can be optimized, but just keeping it simple.
      // this might take a while, so the Streams timeouts should take this into account
      try (final KeyValueIterator<K, Long> all = stateStore.all()) {
        while (all.hasNext()) {
          final KeyValue<K, Long> record = all.next();
          System.out.println("RECORD "+record.key+":"+record.value);

          if (record.value != null && record.value < cutoff) {

            System.out.println("Forwarding Null for key "+record.key);
            // if a record's last update was older than our cutoff, emit a tombstone.
            ValueWrapper vw = new ValueWrapper();
            vw.setDeleted(true);
            context.forward(record.key, vw);
            stateStore.delete(record.key);
          }
        }
      }
    });
  }

  @Override
  public R transform(K key, V value) {

    // this gets invoked for each new record we consume. If it's a tombstone, delete
    // it from our state store. Otherwise, store the record timestamp.
    if (value == null) {
      System.out.println("CLEARING key="+key);
      stateStore.delete(key);
    } else {
      System.out.println("UPDATING key="+key);
      stateStore.put(key, context.timestamp());
    }
    return (R)(new KeyValue<K,ValueWrapper>(key,new ValueWrapper(value)));
  }

  @Override
  public void close() {


  }

}

Create the KafkaStreamsKTableTTLExample class by copying the following file to src/main/java/io/confluent/developer/KafkaStreamsKTableTTLExample.java:

package io.confluent.developer;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import io.confluent.common.utils.TestUtils;
import io.confluent.developer.serdes.JSONSerde;

public class KafkaStreamsKTableTTLExample {

  /**
   * This is the main topology showing a very simple kstream-ktable join The KTable here is based on
   * an input topic and not created in the middle of a topology from an aggregation
   *
   * @param envProp
   * @return
   */
  public Topology buildTopology(Properties envProp) {
    final StreamsBuilder builder = new StreamsBuilder();

    String inputTopicForStream = envProp.getProperty("input.topic.name");
    String inputTopicForTable = envProp.getProperty("table.topic.name");
    String outputTopic = envProp.getProperty("output.topic.name");

    // TTL part of the topology
    // This could be in a separate application
    // Setting tombstones for records seen past a TTL of MAX_AGE
    final Duration MAX_AGE =
        Duration.ofMinutes(Integer.parseInt(envProp.getProperty("table.topic.ttl.minutes")));
    final Duration SCAN_FREQUENCY =
        Duration.ofSeconds(Integer.parseInt(envProp.getProperty("table.topic.ttl.scan.seconds")));
    final String STATE_STORE_NAME = envProp.getProperty("table.topic.ttl.store.name");

    // adding a custom state store for the TTL transformer which has a key of type string, and a
    // value of type long
    // which represents the timestamp
    final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(STATE_STORE_NAME), Serdes.String(), Serdes.Long());



    builder.addStateStore(storeBuilder);


    // Read the input data.
    final KStream<String, String> stream =
        builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));

    final KStream<String, String> stream2 =
        builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()));

    KTable<String, AggregateObject> table = stream2
        .transform(() -> new TTLKTableTombstoneEmitter<String, String, KeyValue<String, ValueWrapper>>(MAX_AGE,
            SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
        .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<ValueWrapper>()))

        .aggregate(AggregateObject::new, (key, value, aggregate) -> {
          System.out.println("aggregate() - value=" + value);
          if (value.isDeleted())
            return null; // signal to tombstone this key in KTable
          else
            return aggregate.add((String) value.getValue());
        }, Materialized.<String, AggregateObject, KeyValueStore<Bytes, byte[]>>as("eventstore")
            .withKeySerde(Serdes.String()).withValueSerde(new JSONSerde<AggregateObject>()));



    // Perform the custom join operation.
    final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
      System.out.println("JOINING left=" + left + " right=" + right);
      if (right != null) {
        int size = right.getValues().size();
        return left + " " + right.getValues().get(size - 1); // concat the last value out of the aggregate
      }
      return left;
    });
    // Write the join results back to Kafka.
    joined.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

    return builder.build();
  }

  public void createTopics(final Properties envProps) {
    final Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
    try (final AdminClient client = AdminClient.create(config)) {

      final List<NewTopic> topics = new ArrayList<>();

      topics.add(new NewTopic(envProps.getProperty("input.topic.name"),
          Integer.parseInt(envProps.getProperty("input.topic.partitions")),
          Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

      topics.add(new NewTopic(envProps.getProperty("table.topic.name"),
          Integer.parseInt(envProps.getProperty("table.topic.partitions")),
          Short.parseShort(envProps.getProperty("table.topic.replication.factor"))));

      topics.add(new NewTopic(envProps.getProperty("output.topic.name"),
          Integer.parseInt(envProps.getProperty("output.topic.partitions")),
          Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));



      client.createTopics(topics);
    }
  }

  public Properties loadEnvProperties(String fileName) throws IOException {
    final Properties envProps = new Properties();
    final FileInputStream input = new FileInputStream(fileName);
    envProps.load(input);
    input.close();



    // These two settings are only required in this contrived example so that the
    // streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
    // streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
    return envProps;
  }

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      throw new IllegalArgumentException(
          "This program takes one argument: the path to an environment configuration file.");
    }

    final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
    final Properties envProps = instance.loadEnvProperties(args[0]);

    // Setup the input topic, table topic, and output topic
    instance.createTopics(envProps);

    // Normally these can be run in separate applications but for the purposes of the demo, we
    // just run both streams instances in the same application
    Topology topology = instance.buildTopology(envProps);

    try (final KafkaStreams streams =
        new KafkaStreams(topology, envProps)) {
      final CountDownLatch startLatch = new CountDownLatch(1);
      // Attach shutdown handler to catch Control-C.
      Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
          // streams.cleanUp();
          streams.close(Duration.ofSeconds(5));
          startLatch.countDown();
        }
      });

      // Start the topology.
      streams.start();

      try {
        startLatch.await();
      } catch (final InterruptedException e) {
        Thread.currentThread().interrupt();
        System.exit(1);
      }
    }
    System.exit(0);
  }
}

Compile and run the Kafka Streams program

6

Now that we have data generation working, let’s build your application by running:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/schedule-ktable-ttl-aggregate-standalone-0.0.1.jar configuration/dev.properties

Enter some data into KTable

7

Let us produce some records into the KTable before doing a join.

Open a terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForTable --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

The producer will start and wait for you to enter input. Each line represents one record and to send it you’ll hit the enter key. If you type multiple words and then hit enter, the entire line is considered one record.

Try typing one line at a time, hit enter and go back to the console consumer window and look for the output. Or, you can select all the records and send at one time.

key1:what a lovely
foo:bar
fun:not quarantine

Enter some data into KStream

8

Let us produce some records into the KStream.

Open another terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForStream --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Type or paste the data into the KStream:

key1:Bunch of coconuts
foo:baz
fun:post quarantine

Consume data from the output topic

9

Now that your Kafka Streams application is running, start a console-consumer to confirm the output:

docker exec -it broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --property print.key=true \
 --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
 --property key.separator=" : "  \
 --from-beginning \
 --max-messages 3

Your results should look someting like this:


key1 : Bunch of coconuts what a lovely
foo : baz bar
fun : post quarantine not quarantine

The timestamp after the user-id is there to help see the time when Kafka Streams executed the punctuation. In practice you most likely wouldn’t append a timestamp to your key.

Wait 65 seconds and enter some data into KTable

10

Let us wait 60 seconds and produce some more records into the KTable. This has the effect of moving the time forward so that the TTL purging kicks in.

Open a terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForTable --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Enter the following data:

key2: some new data

Now enter some KStream data with the old keys

11

Let us produce some records into the KStream using the old keys which should be purged on the KTable side.

Open another terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForStream --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Type or paste the data into the KStream:

key1:Bunch of coconuts
foo:baz
fun:post quarantine

Consume data from the output topic 2nd time

12

Start a console-consumer to confirm the output (no join happened in the final 3 lines so only the KStream side data should appear in those lines):

docker exec -it broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --property print.key=true \
 --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
 --property key.separator=" : "  \
 --from-beginning \
 --max-messages 6

Your results should look someting like this:


key1 : Bunch of coconuts what a lovely
foo : baz bar
fun : post quarantine not quarantine
key1 : Bunch of coconuts
foo : baz
fun : post quarantine

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

application.id=schedule-ktable-ttl-aggregate
bootstrap.servers=localhost:29092
auto.offset.reset=earliest
state.dir=schedule-ktable-ttl-aggregate

input.topic.name=inputTopicForStream
input.topic.partitions=1
input.topic.replication.factor=1

table.topic.name=inputTopicForTable
table.topic.partitions=1
table.topic.replication.factor=1
table.topic.ttl.store.name=test-table-purge-store
table.topic.ttl.minutes=1
table.topic.ttl.scan.seconds=5

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

Write a test

2

Create a directory for the tests to live in:

mkdir -p /tmp/confluent/test
mkdir -p src/test/java/io/confluent/developer

Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

There is only one method in KafkaStreamsKTableTTLExampleTest annotated with @Test, and that is shouldTriggerStreamTableJoinFromTable(). This method actually runs our Streams topology using the TopologyTestDriver and some mocked data that is set up inside the test method.

This test is fairly vanilla, but there is one section we should look into a little more

      tableTopic.pipeInput("alice", "a", 5);  (1)
      tableTopic.pipeInput("bobby", "b", 10);


      inputTopic.pipeInput("alice", "11", 10); (2)
      inputTopic.pipeInput("bobby", "21", 15);
      inputTopic.pipeInput("alice", "12", 30);

      testDriver.advanceWallClockTime(Duration.ofMinutes(1));  (3)
// advance wallclock time to call punctuator
      tableTopic.pipeInput("freddy", "f", 65000);

      inputTopic.pipeInput("alice", "13", 70006);  (4)
      inputTopic.pipeInput("bobby", "22", 70016);

      final List<KeyValue<String, String>> actualResults = outputTopic.readKeyValuesToList();

      assertThat(actualResults.size(), is(5));  (5)

      assertThat(actualResults.get(0).key, is("alice"));
      assertThat(actualResults.get(0).value, is("11 a"));

      assertThat(actualResults.get(1).key, is("bobby"));
      assertThat(actualResults.get(1).value, is("21 b"));

      assertThat(actualResults.get(2).key, is("alice"));
      assertThat(actualResults.get(2).value, is("12 a"));

      assertThat(actualResults.get(3).key, is("alice"));
      assertThat(actualResults.get(3).value, is("13")); // join didn't match on right side


      assertThat(actualResults.get(4).key, is("bobby"));
      assertThat(actualResults.get(4).value, is("22"));
1 Piping some data into the KTable initially
2 Pipe some data into the KStream to join
3 Move the time forward by advancing wallclock
4 Pipe some more KStream data to fail to find a match
5 Check each join to see where it matched and where it didn’t match (due to the TTLed KTable records)

Now create the following file at src/test/java/io/confluent/developer/KafkaStreamsKTableTTLExampleTest.java.

package io.confluent.developer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.Test;

public class KafkaStreamsKTableTTLExampleTest {

  private final static String TEST_CONFIG_FILE = "configuration/test.properties";

  @Test
  public void shouldTriggerStreamTableJoinFromTable() throws Exception {

    final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
    final Properties envProps = instance.loadEnvProperties(TEST_CONFIG_FILE);

    final Properties streamProps = envProps;
    final String inputTopicName = envProps.getProperty("input.topic.name");
    final String outputTopicName = envProps.getProperty("output.topic.name");
    final String tableTopicName = envProps.getProperty("table.topic.name");

    final Topology topology = instance.buildTopology(envProps);
    System.out.println(topology);
    try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProps)) {


      final Serializer<String> keySerializer = Serdes.String().serializer();
      final Serializer<String> valueSerializer = Serdes.String().serializer();

      final Deserializer<String> keyDeserializer = Serdes.String().deserializer();
      final Deserializer<String> valueDeserializer = Serdes.String().deserializer();


      final TestInputTopic<String, String>  inputTopic = testDriver.createInputTopic(inputTopicName,
                                                                                        keySerializer,
                                                                                        valueSerializer);
      final TestInputTopic<String, String>  tableTopic = testDriver.createInputTopic(tableTopicName,
          keySerializer,
          valueSerializer);

      final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic(outputTopicName, keyDeserializer, valueDeserializer);
      final TestOutputTopic<String, String> outputTableTopic = testDriver.createOutputTopic(tableTopicName, keyDeserializer, valueDeserializer);


      tableTopic.pipeInput("alice", "a", 5);
      tableTopic.pipeInput("bobby", "b", 10);


      inputTopic.pipeInput("alice", "11", 10);
      inputTopic.pipeInput("bobby", "21", 15);
      inputTopic.pipeInput("alice", "12", 30);

      testDriver.advanceWallClockTime(Duration.ofMinutes(1));
      tableTopic.pipeInput("freddy", "f", 65000);  // punctuate gets called now

      inputTopic.pipeInput("alice", "13", 70006);
      inputTopic.pipeInput("bobby", "22", 70016);


      final List<KeyValue<String, String>> actualResults = outputTopic.readKeyValuesToList();

      assertThat(actualResults.size(), is(5));

      assertThat(actualResults.get(0).key, is("alice"));
      assertThat(actualResults.get(0).value, is("11 a"));

      assertThat(actualResults.get(1).key, is("bobby"));
      assertThat(actualResults.get(1).value, is("21 b"));

      assertThat(actualResults.get(2).key, is("alice"));
      assertThat(actualResults.get(2).value, is("12 a"));

      assertThat(actualResults.get(3).key, is("alice"));
      assertThat(actualResults.get(3).value, is("13")); // join didn't match on right side


      assertThat(actualResults.get(4).key, is("bobby"));
      assertThat(actualResults.get(4).value, is("22")); // join didn't match on the right side


  }

  }

}

Invoke the tests

3

Now run the test, which is as simple as:

./gradlew test

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.