Now let’s take a look at the components and the approach of the solution.
For context your application has a simple KStream-KTable join where the output of the join is a trivial concatenation of the left side and the right side if the the associated key exists in the KTable. The goal is to purge data in the KTable for which updates have not arrived within a TTL of 1 minute.
|
The following detailed sections are already included in the application file, we’re just taking a detailed step through the code before you create it.
|
Let’s look at the TTL Emitter transformer that will schedule emitting tombstones past a certain time:
TTLKTableTombstoneEmitter transformer punctuator
public TTLKTableTombstoneEmitter(final Duration maxAge, final Duration scanFrequency,
final String stateStoreName) { (1)
this.maxAge = maxAge;
this.scanFrequency = scanFrequency;
this.purgeStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
// This is where the delete signal is created. This HAS to use Wallcock Time
// as the we may not updates to the table's input topic
context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, timestamp -> { (2)
final long cutoff = timestamp - maxAge.toMillis();
// scan over all the keys in this partition's store
// this can be optimized, but just keeping it simple.
// this might take a while, so the Streams timeouts should take this into account
try (final KeyValueIterator<K, Long> all = stateStore.all()) {
while (all.hasNext()) {
final KeyValue<K, Long> record = all.next();
System.out.println("RECORD "+record.key+":"+record.value);
if (record.value != null && record.value < cutoff) {
System.out.println("Forwarding Null for key "+record.key);
// if a record's last update was older than our cutoff, emit a tombstone.
ValueWrapper vw = new ValueWrapper();
vw.setDeleted(true);
context.forward(record.key, vw); (3)
stateStore.delete(record.key); (4)
}
}
}
});
}
@Override
public R transform(K key, V value) { (5)
if (value == null) { (6)
System.out.println("CLEANING key="+key);
stateStore.delete(key);
} else {
System.out.println("UPDATING key="+key);
stateStore.put(key, context.timestamp());
}
return null;
}
1 |
Initialize the transformer with maximum age, scan frequency, and state store name |
2 |
Schedule the operation to (according to wallclock time in this case) to scan all records and pick out which one exceeded TTL. |
3 |
Forward a ValueWrapper object with "deleted" flag set for keys that have not been updated within the maximum age. This is because null values are not passed to aggregate() |
4 |
Remove this key from the state store |
5 |
We still need to create a transform() method to handle incoming changes to the input topic upstream of the KTable |
6 |
Handle tombstones coming from upstream or update the timestamp in the local purge state store |
Next, we set up a simple topology showing the KTable created out of an aggregate function (this just keeps values in a list) and then join the generated KTable with the KStream.
Here, we have to attach the TTLKTableTombstoneEmitter before the groupBy() and aggregate() steps.
Initializing two KStreams in Kafka Streams application and a state store to manage key-timestamp pairs.
// Read the input data.
final KStream<String, String> stream =
builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream2 =
builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()));
// adding a custom state store for the TTL transformer which has a key of type string, and a
// value of type long
// which represents the timestamp
final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder( (1)
Stores.persistentKeyValueStore(STATE_STORE_NAME), Serdes.String(), Serdes.Long());
builder.addStateStore(storeBuilder);
Attaching a transformer to the input stream that is then later used to create a KTable via aggregate()
KTable<String, AggregateObject> table = stream2
.transform(() -> new TTLKTableTombstoneEmitter<String, String, KeyValue<String, ValueWrapper>>(MAX_AGE, (1)
SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
.groupByKey(Grouped.with(Serdes.String(), new JSONSerde<ValueWrapper>()))
.aggregate(AggregateObject::new, (key, value, aggregate) -> {
System.out.println("aggregate() - value=" + value);
if (value.isDeleted())
return null; // signal to tombstone this key in KTable
else
return aggregate.add((String) value.getValue());
}, Materialized.<String, AggregateObject, KeyValueStore<Bytes, byte[]>>as("eventstore")
.withKeySerde(Serdes.String()).withValueSerde(new JSONSerde<AggregateObject>()));
final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
System.out.println("JOINING left=" + left + " right=" + right);
if (right != null) {
int size = right.getValues().size();
return left + " " + right.getValues().get(size - 1); // concat the last value out of the aggregate
}
return left;
});
1 |
Attach the TTLKTableTombstoneEmitter transformer to the stream before the groupBy |
We need to create some supporting classes first.
Let’s create a ValueWrapper class used to signal a delete by copying the following file to src/main/java/io/confluent/developer/ValueWrapper.java:
package io.confluent.developer;
import io.confluent.developer.serdes.JSONSerdeCompatible;
public class ValueWrapper extends JSONSerdeCompatible {
Object value;
boolean deleted = false;
public ValueWrapper() {}
public ValueWrapper(Object value) {
this.value = value;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
public boolean isDeleted() {
return deleted;
}
public void setDeleted(boolean deleted) {
this.deleted = deleted;
}
}
Create the AggregateObject class by copying the following file to src/main/java/io/confluent/developer/AggregateObject.java:
package io.confluent.developer;
import java.util.ArrayList;
import java.util.List;
import io.confluent.developer.serdes.JSONSerdeCompatible;
public class AggregateObject extends JSONSerdeCompatible {
public List<String> values;
public AggregateObject() {
values = new ArrayList<>();
}
public AggregateObject add(String v) {
values.add(v);
return this;
}
public List<String> getValues() {
return values;
}
@Override
public String toString() {
return "AggregateObject [values=" + values + "]";
}
}
Create the JSONSerdeCompatible class by copying the following file to src/main/java/io/confluent/developer/serdes/JSONSerdeCompatible.java:
package io.confluent.developer.serdes;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.confluent.developer.AggregateObject;
import io.confluent.developer.ValueWrapper;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = AggregateObject.class, name = "aggregateObject"),
@JsonSubTypes.Type(value = ValueWrapper.class, name = "valueWrapper")
})
public abstract class JSONSerdeCompatible {
}
Create the JSONSerde class by copying the following file to src/main/java/io/confluent/developer/serdes/JSONSerde.java:
package io.confluent.developer.serdes;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JSONSerde<T> implements Serializer<T>, Deserializer<T>, Serde<T> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public Serializer<T> serializer() {
return this;
}
@Override
public Deserializer<T> deserializer() {
return this;
}
@Override
public T deserialize(String topic, byte[] data) {
if(data == null) return null;
T result;
try {
result = (T) OBJECT_MAPPER.readValue(data, JSONSerdeCompatible.class);
System.out.println("Serialized = "+result);
} catch (IOException e) {
throw new SerializationException(e);
}
return result;
}
@Override
public byte[] serialize(String topic, T data) {
if(data == null) return null;
try {
return OBJECT_MAPPER.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
}
Create the TTLKTableTombstoneEmitter class by copying the following file to src/main/java/io/confluent/developer/TTLKTableTombstoneEmitter.java:
package io.confluent.developer;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
/**
* A simple transformer maintaining a purge store of keys and the
* last update time and if a TTL has been exceeded, emits tombstones
* for those keys
*
* @author sarwar
*
* @param <K>
* @param <V>
* @param <R>
*/
public class TTLKTableTombstoneEmitter<K, V, R> implements Transformer<K, V, R> {
private final Duration maxAge;
private final Duration scanFrequency;
private final String purgeStoreName;
private ProcessorContext context;
private KeyValueStore<K, Long> stateStore;
public TTLKTableTombstoneEmitter(final Duration maxAge, final Duration scanFrequency,
final String stateStoreName) {
this.maxAge = maxAge;
this.scanFrequency = scanFrequency;
this.purgeStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
// This is where the delete signal is created. This HAS to use Wallcock Time
// as the we may not updates to the table's input topic
context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
final long cutoff = timestamp - maxAge.toMillis();
// scan over all the keys in this partition's store
// this can be optimized, but just keeping it simple.
// this might take a while, so the Streams timeouts should take this into account
try (final KeyValueIterator<K, Long> all = stateStore.all()) {
while (all.hasNext()) {
final KeyValue<K, Long> record = all.next();
System.out.println("RECORD "+record.key+":"+record.value);
if (record.value != null && record.value < cutoff) {
System.out.println("Forwarding Null for key "+record.key);
// if a record's last update was older than our cutoff, emit a tombstone.
ValueWrapper vw = new ValueWrapper();
vw.setDeleted(true);
context.forward(record.key, vw);
stateStore.delete(record.key);
}
}
}
});
}
@Override
public R transform(K key, V value) {
// this gets invoked for each new record we consume. If it's a tombstone, delete
// it from our state store. Otherwise, store the record timestamp.
if (value == null) {
System.out.println("CLEARING key="+key);
stateStore.delete(key);
} else {
System.out.println("UPDATING key="+key);
stateStore.put(key, context.timestamp());
}
return (R)(new KeyValue<K,ValueWrapper>(key,new ValueWrapper(value)));
}
@Override
public void close() {
}
}
Create the KafkaStreamsKTableTTLExample class by copying the following file to src/main/java/io/confluent/developer/KafkaStreamsKTableTTLExample.java:
package io.confluent.developer;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import io.confluent.common.utils.TestUtils;
import io.confluent.developer.serdes.JSONSerde;
public class KafkaStreamsKTableTTLExample {
/**
* This is the main topology showing a very simple kstream-ktable join The KTable here is based on
* an input topic and not created in the middle of a topology from an aggregation
*
* @param envProp
* @return
*/
public Topology buildTopology(Properties envProp) {
final StreamsBuilder builder = new StreamsBuilder();
String inputTopicForStream = envProp.getProperty("input.topic.name");
String inputTopicForTable = envProp.getProperty("table.topic.name");
String outputTopic = envProp.getProperty("output.topic.name");
// TTL part of the topology
// This could be in a separate application
// Setting tombstones for records seen past a TTL of MAX_AGE
final Duration MAX_AGE =
Duration.ofMinutes(Integer.parseInt(envProp.getProperty("table.topic.ttl.minutes")));
final Duration SCAN_FREQUENCY =
Duration.ofSeconds(Integer.parseInt(envProp.getProperty("table.topic.ttl.scan.seconds")));
final String STATE_STORE_NAME = envProp.getProperty("table.topic.ttl.store.name");
// adding a custom state store for the TTL transformer which has a key of type string, and a
// value of type long
// which represents the timestamp
final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STATE_STORE_NAME), Serdes.String(), Serdes.Long());
builder.addStateStore(storeBuilder);
// Read the input data.
final KStream<String, String> stream =
builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> stream2 =
builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()));
KTable<String, AggregateObject> table = stream2
.transform(() -> new TTLKTableTombstoneEmitter<String, String, KeyValue<String, ValueWrapper>>(MAX_AGE,
SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
.groupByKey(Grouped.with(Serdes.String(), new JSONSerde<ValueWrapper>()))
.aggregate(AggregateObject::new, (key, value, aggregate) -> {
System.out.println("aggregate() - value=" + value);
if (value.isDeleted())
return null; // signal to tombstone this key in KTable
else
return aggregate.add((String) value.getValue());
}, Materialized.<String, AggregateObject, KeyValueStore<Bytes, byte[]>>as("eventstore")
.withKeySerde(Serdes.String()).withValueSerde(new JSONSerde<AggregateObject>()));
// Perform the custom join operation.
final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
System.out.println("JOINING left=" + left + " right=" + right);
if (right != null) {
int size = right.getValues().size();
return left + " " + right.getValues().get(size - 1); // concat the last value out of the aggregate
}
return left;
});
// Write the join results back to Kafka.
joined.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
public void createTopics(final Properties envProps) {
final Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
try (final AdminClient client = AdminClient.create(config)) {
final List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(envProps.getProperty("input.topic.name"),
Integer.parseInt(envProps.getProperty("input.topic.partitions")),
Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(envProps.getProperty("table.topic.name"),
Integer.parseInt(envProps.getProperty("table.topic.partitions")),
Short.parseShort(envProps.getProperty("table.topic.replication.factor"))));
topics.add(new NewTopic(envProps.getProperty("output.topic.name"),
Integer.parseInt(envProps.getProperty("output.topic.partitions")),
Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));
client.createTopics(topics);
}
}
public Properties loadEnvProperties(String fileName) throws IOException {
final Properties envProps = new Properties();
final FileInputStream input = new FileInputStream(fileName);
envProps.load(input);
input.close();
// These two settings are only required in this contrived example so that the
// streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
// streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return envProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException(
"This program takes one argument: the path to an environment configuration file.");
}
final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
final Properties envProps = instance.loadEnvProperties(args[0]);
// Setup the input topic, table topic, and output topic
instance.createTopics(envProps);
// Normally these can be run in separate applications but for the purposes of the demo, we
// just run both streams instances in the same application
Topology topology = instance.buildTopology(envProps);
try (final KafkaStreams streams =
new KafkaStreams(topology, envProps)) {
final CountDownLatch startLatch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
// streams.cleanUp();
streams.close(Duration.ofSeconds(5));
startLatch.countDown();
}
});
// Start the topology.
streams.start();
try {
startLatch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
System.exit(1);
}
}
System.exit(0);
}
}