This tutorial demonstrates how to integrate Schema Registry in a Kafka Streams application. That is, how to handle application input and/or output topics that have an associated schema.
The sample application that this tutorial uses is based on the tutorial How to create a Kafka Streams application, with the following enhancements to integrate Schema Registry:
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLogin to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name kafka-streams-sr-env \
--kafka-cluster-name kafka-streams-sr-cluster \
--create-kafka-key \
--create-sr-key \
--kafka-java-properties-file ./kafka-streams-schema-registry/kstreams/src/main/resources/cloud.propertiesThe plugin should complete in under a minute.
Create the input and output topics for the application:
confluent kafka topic create input
confluent kafka topic create outputStart a console producer to produce Avro-formatted records:
confluent kafka topic produce input \
--value-format avro \
--schema ./kafka-streams-schema-registry/kstreams/src/main/avro/my-record.avscEnter a few lowercase string records:
{"my_string": "hello"}
{"my_string": "world"}Enter Ctrl+C to exit the console producer.
Compile the application from the top-level tutorials repository directory:
./gradlew kafka-streams-schema-registry:kstreams:shadowJarNavigate into the application's home directory:
cd kafka-streams-schema-registry/kstreamsRun the application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:
java -cp ./build/libs/kafka-streams-schema-registry-standalone.jar \
io.confluent.developer.KafkaStreamsApplication \
./src/main/resources/cloud.propertiesValidate that you see uppercase string Avro records in the output topic.
confluent kafka topic consume output \
--value-format avro \
--from-beginningYou should see:
{"my_string":"HELLO"}
{"my_string":"WORLD"}When you are finished, delete the kafka-streams-sr-env environment by first getting the environment ID of the form env-123456 corresponding to it:
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>git clone git@github.com:confluentinc/tutorials.git
cd tutorialsStart Kafka and Schema Registry with the following command run from the top-level tutorials repository directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml up -dOpen a shell in the broker container:
docker exec -it broker /bin/bashCreate the input and output topics for the application:
kafka-topics --bootstrap-server localhost:9092 --create --topic input
kafka-topics --bootstrap-server localhost:9092 --create --topic outputExit the broker container by entering Ctrl+D.
Open a shell in the Schema Registry container:
docker exec -it schema-registry /bin/bashStart a console producer to produce Avro-formatted records:
kafka-avro-console-producer \
--topic input \
--bootstrap-server broker:29092 \
--property schema.registry.url="http://localhost:8081" \
--property value.schema='{ "namespace": "io.confluent.developer.avro", "type": "record", "name": "MyRecord", "fields": [ {"name": "my_string", "type": "string"} ] }'Enter a few lowercase string Avro records:
{"my_string": "hello"}
{"my_string": "world"}Enter Ctrl+C to exit the console producer.
On your local machine, compile the app:
./gradlew kafka-streams-schema-registry:kstreams:shadowJarNavigate into the application's home directory:
cd kafka-streams-schema-registry/kstreamsRun the application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092 and Schema Registry at http://localhost:8081:
java -cp ./build/libs/kafka-streams-schema-registry-standalone.jar \
io.confluent.developer.KafkaStreamsApplication \
./src/main/resources/local.propertiesValidate that you see uppercase string Avro records in the output topic. Open a shell in the Schema Registry container:
docker exec -it schema-registry /bin/bashRun a console consumer:
kafka-avro-console-consumer \
--topic output \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--from-beginningYou should see:
{"my_string":"HELLO"}
{"my_string":"WORLD"}Stop the Kafka Streams application by entering Ctrl+C.
From your local machine, stop the broker and Schema Registry containers. From the top-level tutorials directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml down