Event Streams may contain a subset of Events which need to be processed in isolation. For example, an inventory check system may be distributed across multiple physical systems, and the target system may depend on the category of the item being checked.
How can we isolate Events into a dedicated Event Stream based on an attribute of the Events?
As an example, with Apache Flink® SQL, we can continuously route Events to a different Event Stream using CREATE TABLE AS SELECT (CTAS) syntax:
CREATE TABLE payments ...;
CREATE TABLE payments_france AS
SELECT * FROM payments WHERE country = 'france';
CREATE TABLE payments_spain AS
SELECT * FROM payments WHERE country = 'spain';
With the Apache Kafka® client library Kafka Streams, use a TopicNameExtractor to route Events to different Event Streams (called "topics" in Kafka). The TopicNameExtractor has one method to implement, extract(), which accepts three parameters:
We can use any of the given parameters to generate and return the desired destination topic name for the given Event. Kafka Streams will complete the routing.
CountryTopicExtractor implements TopicNameExtractor<String, String> {
String extract(String key, String value, RecordContext recordContext) {
switch (value.country) {
case "france":
return "france-topic";
case "spain":
return "spain-topic";
}
}
}
KStream<String, String> myStream = builder.stream(...);
myStream.mapValues(..).to(new CountryTopicExtractor());