Consider a topic with events, and you want to filter out records not matching a given attribute.
 builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), publicationSerde))
        .filter((name, publication) -> "George R. R. Martin".equals(publication.name()))
        .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), publicationSerde));To keep only records in the event stream matching a given predicate (either the key or the value), you'll use the KStream.filter.
For retaining records that
do not match a predicate you can use KStream.filterNot
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLogin to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
  --environment-name kafka-streams-filtering-env \
  --kafka-cluster-name kafka-streams-filtering-cluster \
  --create-kafka-key \
  --kafka-java-properties-file ./filtering/kstreams/src/main/resources/cloud.propertiesThe plugin should complete in under a minute.
Create the input and output topics for the application:
confluent kafka topic create filtering-input
confluent kafka topic create filtering-outputStart a console producer:
confluent kafka topic produce filtering-inputEnter a few JSON-formatted books:
{"name":"George R. R. Martin", "title":"A Song of Ice and Fire"}
{"name":"C.S. Lewis", "title":"The Silver Chair"}
{"name":"C.S. Lewis", "title":"Perelandra"}
{"name":"George R. R. Martin", "title":"Fire & Blood"}
{"name":"J. R. R. Tolkien", "title":"The Hobbit"}Enter Ctrl+C to exit the console producer.
Compile the application from the top-level tutorials repository directory:
./gradlew filtering:kstreams:shadowJarNavigate into the application's home directory:
cd filtering/kstreamsRun the application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:
java -cp ./build/libs/kstreams-filter-standalone.jar \
    io.confluent.developer.FilterEvents \
    ./src/main/resources/cloud.propertiesValidate that you see only the books by George R. R. Martin in the filtering-output topic.
confluent kafka topic consume filtering-output -bYou should see:
{"name":"George R. R. Martin","title":"A Song of Ice and Fire"}
{"name":"George R. R. Martin","title":"Fire & Blood"}When you are finished, delete the kafka-streams-filtering-env environment by first getting the environment ID of the form env-123456 corresponding to it:
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>git clone git@github.com:confluentinc/tutorials.git
cd tutorialsStart Kafka with the following command run from the top-level tutorials repository directory:
docker compose -f ./docker/docker-compose-kafka.yml up -dOpen a shell in the broker container:
docker exec -it broker /bin/bashCreate the input and output topics for the application:
kafka-topics --bootstrap-server localhost:9092 --create --topic filtering-input
kafka-topics --bootstrap-server localhost:9092 --create --topic filtering-outputStart a console producer:
kafka-console-producer --bootstrap-server localhost:9092 --topic filtering-inputEnter a few JSON-formatted books:
{"name":"George R. R. Martin", "title":"A Song of Ice and Fire"}
{"name":"C.S. Lewis", "title":"The Silver Chair"}
{"name":"C.S. Lewis", "title":"Perelandra"}
{"name":"George R. R. Martin", "title":"Fire & Blood"}
{"name":"J. R. R. Tolkien", "title":"The Hobbit"}Enter Ctrl+C to exit the console producer.
On your local machine, compile the app:
./gradlew filtering:kstreams:shadowJarNavigate into the application's home directory:
cd filtering/kstreamsRun the application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092:
java -cp ./build/libs/kstreams-filter-standalone.jar \
    io.confluent.developer.FilterEvents \
    ./src/main/resources/local.propertiesValidate that you see only the books by George R. R. Martin in the filtering-output topic. In the broker container shell:
kafka-console-consumer --bootstrap-server localhost:9092 --topic filtering-output --from-beginningYou should see:
{"name":"George R. R. Martin","title":"A Song of Ice and Fire"}
{"name":"George R. R. Martin","title":"Fire & Blood"}From your local machine, stop the broker container:
docker compose -f ./docker/docker-compose-kafka.yml down