Get Started Free
Tutorial

How to filter messages in a Kafka topic with Kafka Streams

How to filter messages in a Kafka topic with Kafka Streams

Consider a topic with events, and you want to filter out records not matching a given attribute.

 builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), publicationSerde))
        .filter((name, publication) -> "George R. R. Martin".equals(publication.name()))
        .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), publicationSerde));

To keep only records in the event stream matching a given predicate (either the key or the value), you'll use the KStream.filter.
For retaining records that do not match a predicate you can use KStream.filterNot