If you have a Kafka Streams application or ksqlDB application which creates a KTable in the middle of a topology by calling aggregate(), you can purge older data by making clever use of tombstones. Specifically, by maintaining a state store containing TTLs, you can write tombstones out to the topic underlying the KTable in order to expire data.
The tombstones can be generated by first using the Kafka Streams Processor API to maintain the latest stream processing time per key:
KTable<String, AggregateObject> table = stream2
.process(new TTLEmitter<String, String, String, ValueWrapper>(MAX_AGE,
SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
.groupByKey(Grouped.with(Serdes.String(), vwSerde))
.aggregate(AggregateObject::new, (key, value, aggregate) -> {
System.out.println("aggregate() - value=" + value);
if (value.deleted())
return null; // signal to tombstone this key in KTable
else
return aggregate.add(value.value().toString());
}, Materialized.<String, AggregateObject, KeyValueStore<Bytes, byte[]>>as("eventstore")
.withKeySerde(Serdes.String()).withValueSerde(aggObjectSerde));
The TTLEmitter class implements the ProcessorSupplier interface with the method to process each record looking like this. Note that because the processor will also receive tombstones, we use tombstones to clean out TTLs from the state store:
@Override
public void process(Record<Kin, Vin> record) {
// this gets invoked for each new record we consume. If it's a tombstone, delete
// it from our state store. Otherwise, store the record timestamp.
if (record.value() == null) {
System.out.println("CLEARING key=" + record.key());
stateStore.delete((Kout) record.key());
} else {
System.out.println("UPDATING key=" + record.key());
stateStore.put((Kout) record.key(), record.timestamp());
}
context.forward(new Record(record.key(), new ValueWrapper(record.value(), false), record.timestamp()));
}
The final piece of the puzzle is to use ProcessorContext.schedule to periodically pass over the state store and emit tombstones for any records that are older than a specified cutoff. Note that we use a ValueWrapper object containing a deleted flag because null values are not passed to aggregate().
context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> {
final long cutoff = timestamp - maxAge.toMillis();
// scan over all the keys in this partition's store
// this can be optimized, but just keeping it simple.
// this might take a while, so the Streams timeouts should take this into account
try (final KeyValueIterator<Kout, Long> all = stateStore.all()) {
while (all.hasNext()) {
final KeyValue<Kout, Long> record = all.next();
System.out.println("RECORD "+record.key+":"+record.value);
if (record.value != null && record.value < cutoff) {
System.out.println("Forwarding Null for key "+record.key);
// if a record's last update was older than our cutoff, emit a tombstone.
ValueWrapper vw = new ValueWrapper(record.value, true);
context.forward(new Record(record.key, vw, 0, null));
stateStore.delete(record.key);
}
}
}
});