Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
Before you create the Kafka Streams application file let’s go over the key points of the application. In this tutorial we want to
show how a KTable loaded from an input topic can have its data periodically purged via use of a transformer. The example shows a fixed
TTL per key based on the last update for that key. This may or may not serve all needs but it is sufficient to illustrate the
mechanism via which we can purge data from a KTable. The transformer use a state store to store the last updated time seen for a key
and periodically (via a punctuator) scans its list of keys to see if any of them have exceeded a configured cutoff period (TTL). If they have, then a tombstone is forwarded onwards in the pipeline and the key removed from its own internal store.
Now let’s take a look at some of the key points from the application.
For context your application has a simple KStream-KTable join where the output of the join is a trivial concatenation of the left side and the right side if the the associated key exists in the KTable. The goal is to purge data in the KTable for which updates have not arrived within a TTL of 1 minute.
|
The following detailed sections are already included in the application file, we’re just taking a detailed step through the code before you create it.
|
Let’s look at the TTL Emitter transformer that will schedule emitting tombstones past a certain time:
TTLEmitter transformer punctuator
public TTLEmitter(final Duration maxAge, final Duration scanFrequency,
final String stateStoreName) { (1)
this.maxAge = maxAge;
this.scanFrequency = scanFrequency;
this.purgeStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> { (2)
final long cutoff = timestamp - maxAge.toMillis();
try (final KeyValueIterator<K, Long> all = stateStore.all()) {
while (all.hasNext()) {
final KeyValue<K, Long> record = all.next();
if (record.value != null && record.value < cutoff) {
System.out.println("Forwarding Null");
context.forward(record.key, null); (3)
}
}
}
});
}
@Override
public R transform(K key, V value) { (4)
if (value == null) { (5)
System.out.println("CLEANING key="+key);
stateStore.delete(key);
} else {
System.out.println("UPDATING key="+key);
stateStore.put(key, context.timestamp());
}
return null;
}
1 |
Initialize the transformer with maximum age, scan frequency, and state store name |
2 |
Schedule the operation to (according to stream time) to scan all records and pick out which one exceeded TTL. We could change this to wallclock time based but it means in some cases there could just be data deleted without any activity in the KTable topic due to a failure. If the use case understands the implication of using wallclock time, they can use that. |
3 |
Forward the tombstone for keys that have not been updated within the maximum age |
4 |
We still need to create a transform() method to handle incoming changes to the KTable |
5 |
Handle tombstones coming from upstream or update the timestamp in the local purge state store |
Next we setup simple KStream-KTable join to have a KTable that we will attach a TTLEmitter to.
Initializing a simple KStream-KTable join in the Kafka Streams application
// Read the input data.
final KStream<String, String> stream =
builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> table = builder.table(inputTopicForTable,
Consumed.with(Serdes.String(), Serdes.String()));
// Perform the custom join operation.
final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
System.out.println("JOINING left="+left+" right="+right);
if (right != null)
return left+" "+right; // this is, of course, a completely fake join logic
return left;
});
// Write the join results back to Kafka.
joined.to(outputTopic,
Produced.with(Serdes.String(), Serdes.String()));
Next we attach a transformer to the original table in order to do the work of emitting tombstones as appropriate:
Attaching a transformer to the KTable and writing back to the KTable’s input topic
// tap the table topic in order to insert a tombstone after MAX_AGE based on event time
//builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()))
table.toStream() //we just have to do this part for doing in the same topology but in another app, you can do as above
.transform(() -> new TTLEmitter<String, String, KeyValue<String, String>>(MAX_AGE, (1)
SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
.to(inputTopicForTable, Produced.with(Serdes.String(), Serdes.String())); // write the
// tombstones back
// out to the input
// topic
1 |
Turn the table into a stream and call transform on it with the TTL Emitter |
Create the TTLEmitter class by copying the following file to src/main/java/io/confluent/developer/TTLEmitter.java:
package io.confluent.developer;
import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
/**
* A simple transformer maintaining a purge store of keys and the
* last update time and if a TTL has been exceeded, emits tombstones
* for those keys
*
* @author sarwar
*
* @param <K>
* @param <V>
* @param <R>
*/
public class TTLEmitter<K, V, R> implements Transformer<K, V, R> {
private final Duration maxAge;
private final Duration scanFrequency;
private final String purgeStoreName;
private ProcessorContext context;
private KeyValueStore<K, Long> stateStore;
public TTLEmitter(final Duration maxAge, final Duration scanFrequency,
final String stateStoreName) {
this.maxAge = maxAge;
this.scanFrequency = scanFrequency;
this.purgeStoreName = stateStoreName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
// This is where the magic happens. This causes Streams to invoke the Punctuator
// on an interval, using stream time. That is, time is only advanced by the record
// timestamps
// that Streams observes. This has several advantages over wall-clock time for this
// application:
//
// It'll produce the exact same sequence of updates given the same sequence of data.
// This seems nice, since the purpose is to modify the data stream itself, you want to
// have a clear understanding of when stuff is going to get deleted. For example, if something
// breaks down upstream for this topic, and it stops getting new data for a while, wall
// clock time would just keep deleting data on schedule, whereas stream time will wait for
// new updates to come in.
//
// You can change to wall clock time here if that is what is needed
context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> {
final long cutoff = timestamp - maxAge.toMillis();
// scan over all the keys in this partition's store
// this can be optimized, but just keeping it simple.
// this might take a while, so the Streams timeouts should take this into account
try (final KeyValueIterator<K, Long> all = stateStore.all()) {
while (all.hasNext()) {
final KeyValue<K, Long> record = all.next();
if (record.value != null && record.value < cutoff) {
System.out.println("Forwarding Null");
// if a record's last update was older than our cutoff, emit a tombstone.
context.forward(record.key, null);
}
}
}
});
}
@Override
public R transform(K key, V value) {
// this gets invoked for each new record we consume. If it's a tombstone, delete
// it from our state store. Otherwise, store the record timestamp.
if (value == null) {
System.out.println("CLEANING key="+key);
stateStore.delete(key);
} else {
System.out.println("UPDATING key="+key);
stateStore.put(key, context.timestamp());
}
return null; // no need to return anything here. the punctuator will emit the tombstones
// when necessary
}
@Override
public void close() {
}
}
Create the KafkaStreamsKTableTTLExample class by copying the following file to src/main/java/io/confluent/developer/KafkaStreamsKTableTTLExample.java:
package io.confluent.developer;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import io.confluent.common.utils.TestUtils;
public class KafkaStreamsKTableTTLExample {
/**
* This is the main topology showing a very simple kstream-ktable join
* The KTable here is based on an input topic and not created in the middle
* of a topology from an aggregation
*
* @param envProp
* @return
*/
public Topology buildTopology(Properties envProp) {
final StreamsBuilder builder = new StreamsBuilder();
String inputTopicForStream = envProp.getProperty("input.topic.name");
String inputTopicForTable = envProp.getProperty("table.topic.name");
String outputTopic = envProp.getProperty("output.topic.name");
// Read the input data.
final KStream<String, String> stream =
builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> table = builder.table(inputTopicForTable,
Consumed.with(Serdes.String(), Serdes.String()));
// Perform the custom join operation.
final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
System.out.println("JOINING left="+left+" right="+right);
if (right != null)
return left+" "+right; // this is, of course, a completely fake join logic
return left;
});
// Write the join results back to Kafka.
joined.to(outputTopic,
Produced.with(Serdes.String(), Serdes.String()));
// TTL part of the topology
// This could be in a separate application
// Setting tombstones for records seen past a TTL of MAX_AGE
final Duration MAX_AGE = Duration.ofMinutes(Integer.parseInt(envProp.getProperty("table.topic.ttl.minutes")));
final Duration SCAN_FREQUENCY = Duration.ofSeconds(Integer.parseInt(envProp.getProperty("table.topic.ttl.scan.seconds")));
final String STATE_STORE_NAME = envProp.getProperty("table.topic.ttl.store.name");
// adding a custom state store for the TTL transformer which has a key of type string, and a
// value of type long
// which represents the timestamp
final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STATE_STORE_NAME),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
// tap the table topic in order to insert a tombstone after MAX_AGE based on event time
//builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()))
table.toStream() //we just have to do this part for doing in the same topology but in another app, you can do as above
.transform(() -> new TTLEmitter<String, String, KeyValue<String, String>>(MAX_AGE,
SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
.to(inputTopicForTable, Produced.with(Serdes.String(), Serdes.String())); // write the
// tombstones back
// out to the input
// topic
System.out.println(builder.toString());
return builder.build();
}
public Properties getStreamProps(Properties envProp) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, envProp.get("application.id"));
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProp.get("bootstrap.servers"));
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the
// test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
//streamsConfiguration.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 20000);
// These two settings are only required in this contrived example so that the
// streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
// streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
return streamsConfiguration;
}
public void createTopics(final Properties envProps) {
final Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
try (final AdminClient client = AdminClient.create(config)) {
final List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(envProps.getProperty("input.topic.name"),
Integer.parseInt(envProps.getProperty("input.topic.partitions")),
Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(envProps.getProperty("table.topic.name"),
Integer.parseInt(envProps.getProperty("table.topic.partitions")),
Short.parseShort(envProps.getProperty("table.topic.replication.factor"))));
topics.add(new NewTopic(envProps.getProperty("output.topic.name"),
Integer.parseInt(envProps.getProperty("output.topic.partitions")),
Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));
client.createTopics(topics);
}
}
public Properties loadEnvProperties(String fileName) throws IOException {
final Properties envProps = new Properties();
final FileInputStream input = new FileInputStream(fileName);
envProps.load(input);
input.close();
// These two settings are only required in this contrived example so that the
// streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
// streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
return envProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException(
"This program takes one argument: the path to an environment configuration file.");
}
final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
final Properties envProps = instance.loadEnvProperties(args[0]);
// Setup the input topic, table topic, and output topic
instance.createTopics(envProps);
// Normally these can be run in separate applications but for the purposes of the demo, we
// just run both streams instances in the same application
try (final KafkaStreams streams = new KafkaStreams(instance.buildTopology(envProps), instance.getStreamProps(envProps))) {
final CountDownLatch startLatch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
//streams.cleanUp();
streams.close(Duration.ofSeconds(5));
startLatch.countDown();
}
});
// Start the topology.
streams.start();
try {
startLatch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
System.exit(1);
}
}
System.exit(0);
}
}