Event Streams may contain a subset of Events which need to be processed in isolation. For example, an inventory check system may be distributed across multiple physical systems, and the target system may depend on the category of the item being checked.
How can we isolate Events into a dedicated Event Stream based on an attribute of the Events?
With the streaming database ksqlDB, we can continuously route Events to a different Event Stream. We use the CREATE STREAM
syntax with an appropriate WHERE
filter:
CREATE STREAM payments ...;
CREATE STREAM payments_france AS
SELECT * FROM payments WHERE country = 'france';
CREATE STREAM payments_spain AS
SELECT * FROM payments WHERE country = 'spain';
With the Apache Kafka® client library Kafka Streams, use a TopicNameExtractor
to route Events to different Event Streams (called "topics" in Kafka). The TopicNameExtractor
has one method to implement, extract()
, which accepts three parameters:
RecordContext
, which provides access to headers, partitions, and other contextual information about the eventWe can use any of the given parameters to generate and return the desired destination topic name for the given Event. Kafka Streams will complete the routing.
CountryTopicExtractor implements TopicNameExtractor<String, String> {
String extract(String key, String value, RecordContext recordContext) {
switch (value.country) {
case "france":
return "france-topic";
case "spain":
return "spain-topic";
}
}
}
KStream<String, String> myStream = builder.stream(...);
myStream.mapValues(..).to(new CountryTopicExtractor());