Get Started Free
Tutorial

How to expire KTable rows based on TTL in Kafka Streams

How to expire KTable rows based on TTL in Kafka Streams

If you have a Kafka Streams application or ksqlDB application which uses KTables from a topic in Kafka, you can purge older data by making clever use of tombstones. Specifically, by maintaining a state store containing TTLs, you can write tombstones out to the topic underlying the KTable in order to expire data.

The tombstones can be generated by first using the Kafka Streams Processor API to maintain the latest stream processing time per key:

    table.toStream()
        .process(new TTLEmitter<String, String, String, String>(MAX_AGE,
                SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
        .to(INPUT_TOPIC_TABLE, Produced.with(Serdes.String(), Serdes.String()));

The TTLEmitter class implements the ProcessorSupplier interface with the method to process each record looking like this. Note that because the processor will also receive tombstones, we use tombstones to clean out TTLs from the state store:

    @Override
    public void process(Record<Kin, Vin> record) {
        // this gets invoked for each new record we consume. If it's a tombstone, delete
        // it from our state store. Otherwise, store the record timestamp.
        if (record.value() == null) {
            System.out.println("CLEANING key=" + record.key());
            stateStore.delete((Kout) record.key());
        } else {
            System.out.println("UPDATING key=" + record.key());
            stateStore.put((Kout) record.key(), context.currentStreamTimeMs());
        }
    }

The final piece of the puzzle is to use ProcessorContext.schedule to periodically pass over the state store and emit tombstones for any records that are older than a specified cutoff:

    context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> {
        final long cutoff = timestamp - maxAge.toMillis();

        try (final KeyValueIterator<Kout, Long> all = stateStore.all()) {
            while (all.hasNext()) {
                final KeyValue<Kout, Long> record = all.next();
                if (record.value != null && record.value < cutoff) {
                    // record's last update was older than the cutoff, so emit a tombstone.
                    context.forward(new Record(record.key, null, 0, null));
                }
            }
        }
    });