Tutorial

How to build Kafka Streams applications with Schema Registry

How to build Kafka Streams applications with Schema Registry

This tutorial demonstrates how to integrate Schema Registry in a Kafka Streams application. That is, how to handle application input and/or output topics that have an associated schema.

The sample application that this tutorial uses is based on the tutorial How to create a Kafka Streams application, with the following enhancements to integrate Schema Registry:

  • The value serde passed to the Consumed.with and Produced.with methods use a SpecificAvroSerde (serde documentation for other schema formats are also available at this link)
  • This serde is configured with Schema Registry connection details (the Schema Registry URL and credentials, if applicable)
  • The included unit test uses an in-memory MockSchemaRegistryClient so that a real Schema Registry instance isn't needed for unit testing
  • The build.gradle file includes dependencies to serialize and deserialize Avro-formatted records: io.confluent:kafka-streams-avro-serde and org.apache.avro:avro. Other message formats would require similar dependencies, like io.confluent:kafka-streams-protobuf-serde for Protobuf support.

Prerequisites

  • A Confluent Cloud account
  • The Confluent CLI installed on your machine
  • Clone the confluentinc/tutorials repository and navigate into its top-level directory:
    git clone git@github.com:confluentinc/tutorials.git
    cd tutorials

Create Confluent Cloud resources

Login to your Confluent Cloud account:

confluent login --prompt --save

Install a CLI plugin that will streamline the creation of resources in Confluent Cloud:

confluent plugin install confluent-quickstart

Run the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.

confluent quickstart \
  --environment-name kafka-streams-sr-env \
  --kafka-cluster-name kafka-streams-sr-cluster \
  --create-kafka-key \
  --create-sr-key \
  --kafka-java-properties-file ./kafka-streams-schema-registry/kstreams/src/main/resources/cloud.properties

The plugin should complete in under a minute.

Create topics

Create the input and output topics for the application:

confluent kafka topic create input
confluent kafka topic create output

Start a console producer to produce Avro-formatted records:

confluent kafka topic produce input \
  --value-format avro \
  --schema ./kafka-streams-schema-registry/kstreams/src/main/avro/my-record.avsc

Enter a few lowercase string records:

{"my_string": "hello"}
{"my_string": "world"}

Enter Ctrl+C to exit the console producer.

Compile and run the application

Compile the application from the top-level tutorials repository directory:

./gradlew kafka-streams-schema-registry:kstreams:shadowJar

Navigate into the application's home directory:

cd kafka-streams-schema-registry/kstreams

Run the application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:

java -cp ./build/libs/kafka-streams-schema-registry-standalone.jar \
    io.confluent.developer.KafkaStreamsApplication \
    ./src/main/resources/cloud.properties

Validate that you see uppercase string Avro records in the output topic.

confluent kafka topic consume output \
  --value-format avro \
  --from-beginning

You should see:

{"my_string":"HELLO"}
{"my_string":"WORLD"}

Clean up

When you are finished, delete the kafka-streams-sr-env environment by first getting the environment ID of the form env-123456 corresponding to it:

confluent environment list

Delete the environment, including all resources created for this tutorial:

confluent environment delete <ENVIRONMENT ID>
Docker instructions

Prerequisites

  • Docker running via Docker Desktop or Docker Engine
  • Docker Compose. Ensure that the command docker compose version succeeds.
  • Clone the confluentinc/tutorials repository and navigate into its top-level directory:
    git clone git@github.com:confluentinc/tutorials.git
    cd tutorials

Start Kafka in Docker

Start Kafka and Schema Registry with the following command run from the top-level tutorials repository directory:

docker compose -f ./docker/docker-compose-kafka-sr.yml up -d

Create topics

Open a shell in the broker container:

docker exec -it broker /bin/bash

Create the input and output topics for the application:

kafka-topics --bootstrap-server localhost:9092 --create --topic input
kafka-topics --bootstrap-server localhost:9092 --create --topic output

Exit the broker container by entering Ctrl+D.

Produce messages

Open a shell in the Schema Registry container:

docker exec -it schema-registry /bin/bash

Start a console producer to produce Avro-formatted records:

kafka-avro-console-producer \
  --topic input \
  --bootstrap-server broker:29092 \
  --property schema.registry.url="http://localhost:8081" \
  --property value.schema='{ "namespace": "io.confluent.developer.avro", "type": "record", "name": "MyRecord", "fields": [ {"name": "my_string", "type": "string"} ] }'

Enter a few lowercase string Avro records:

{"my_string": "hello"}
{"my_string": "world"}

Enter Ctrl+C to exit the console producer.

Compile and run the application

On your local machine, compile the app:

./gradlew kafka-streams-schema-registry:kstreams:shadowJar

Navigate into the application's home directory:

cd kafka-streams-schema-registry/kstreams

Run the application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092 and Schema Registry at http://localhost:8081:

java -cp ./build/libs/kafka-streams-schema-registry-standalone.jar \
    io.confluent.developer.KafkaStreamsApplication \
    ./src/main/resources/local.properties

Validate that you see uppercase string Avro records in the output topic. Open a shell in the Schema Registry container:

docker exec -it schema-registry /bin/bash

Run a console consumer:

kafka-avro-console-consumer \
  --topic output \
  --bootstrap-server broker:29092 \
  --property schema.registry.url=http://localhost:8081 \
  --from-beginning

You should see:

{"my_string":"HELLO"}
{"my_string":"WORLD"}

Clean up

Stop the Kafka Streams application by entering Ctrl+C.

From your local machine, stop the broker and Schema Registry containers. From the top-level tutorials directory:

docker compose -f ./docker/docker-compose-kafka-sr.yml down
Do you have questions or comments? Join us in the #developer-confluent-io community Slack channel to engage in discussions with the creators of this content.