This tutorial demonstrates how to build Kafka producer and consumer applications in Java that use Schema Registry for message schema management. You'll learn how to configure your Java applications to serialize and deserialize records, ensuring type safety and schema evolution compatibility. By the end of this tutorial, you'll have working applications that produce and consume device temperature reading records.
The applications in this tutorial use Avro-formatted messages. In order to use Protobuf or JSON Schema formatting, you would need to use a different serializer / deserializer, but otherwise the applications would be similarly structured.
The steps in this tutorial outline how to set up the required Kafka infrastructure and run the provided producer / consumer applications. For a deeper look at the application source code, refer to the Code explanation section at the bottom.
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLog in to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial.
Note: You may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name kafka-sr-env \
--kafka-cluster-name kafka-sr-cluster \
--create-kafka-key \
--create-sr-key \
--kafka-java-properties-file ./schema-registry-java/src/main/resources/cloud.propertiesThe plugin should complete in under a minute.
Create the topic for the application:
confluent kafka topic create readingsCompile the application from the top-level tutorials repository directory:
./gradlew schema-registry-java:shadowJarNavigate into the application's home directory:
cd schema-registry-javaRun the producer application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/cloud.propertiesValidate that you see temperature reading Avro records in the readings topic.
confluent kafka topic consume readings \
--value-format avro \
--from-beginningYou should see output similar to the following:
{"deviceId":"3","temperature":99.5231}
{"deviceId":"3","temperature":70.56588}
{"deviceId":"1","temperature":99.817894}
{"deviceId":"1","temperature":98.89636}
{"deviceId":"0","temperature":96.56193}
{"deviceId":"2","temperature":97.53318}
{"deviceId":"2","temperature":75.94116}
{"deviceId":"0","temperature":74.87793}
{"deviceId":"0","temperature":76.37975}
{"deviceId":"0","temperature":83.31611}These messages correspond to the Avro schema in src/main/avro/temp-reading.avsc:
{
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "TempReading",
"fields": [
{ "name": "deviceId", "type": "string" },
{ "name": "temperature", "type": "float" }
]
}Run the consumer application:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroConsumer \
./src/main/resources/cloud.propertiesYou should see output similar to the following:
[main] INFO - Consumed event: key = 4, value = {"deviceId": "4", "temperature": 99.00065}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 95.12411}
[main] INFO - Consumed event: key = 0, value = {"deviceId": "0", "temperature": 99.8184}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 92.55404}
[main] INFO - Consumed event: key = 3, value = {"deviceId": "3", "temperature": 79.467354}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 77.81964}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 87.234375}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 78.16981}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 97.42639}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 98.66289}When you are finished, delete the kafka-sr-env environment by first getting the environment ID of the form env-123456 corresponding to it:
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>git clone git@github.com:confluentinc/tutorials.git
cd tutorialsStart Kafka and Schema Registry with the following command run from the top-level tutorials repository directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml up -dOpen a shell in the broker container:
docker exec -it broker /bin/bashCreate the topic for the application:
kafka-topics --bootstrap-server localhost:9092 --create --topic readingsExit the broker container by entering Ctrl+D.
On your local machine, compile the app:
./gradlew schema-registry-java:shadowJarNavigate into the application's home directory:
cd schema-registry-javaRun the producer application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092 and Schema Registry at http://localhost:8081:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/local.propertiesValidate that you see temperature reading Avro records in the readings topic. Open a shell in the Schema Registry container:
docker exec -it schema-registry /bin/bashRun a console consumer:
kafka-avro-console-consumer \
--topic readings \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--from-beginningYou should see output similar to this:
{"deviceId":"2","temperature":96.71052551269531}
{"deviceId":"2","temperature":78.42681121826172}
{"deviceId":"3","temperature":95.85462951660156}
{"deviceId":"2","temperature":83.17869567871094}
{"deviceId":"3","temperature":79.87565612792969}
{"deviceId":"1","temperature":79.03103637695312}
{"deviceId":"0","temperature":87.11306762695312}
{"deviceId":"0","temperature":76.37906646728516}
{"deviceId":"3","temperature":75.17118072509766}
{"deviceId":"2","temperature":84.00798034667969}These messages correspond to the Avro schema in src/main/avro/temp-reading.avsc:
{
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "TempReading",
"fields": [
{ "name": "deviceId", "type": "string" },
{ "name": "temperature", "type": "float" }
]
}Run the consumer application:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroConsumer \
./src/main/resources/local.propertiesYou should see output similar to the following:
[main] INFO - Consumed event: key = 4, value = {"deviceId": "4", "temperature": 99.00065}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 95.12411}
[main] INFO - Consumed event: key = 0, value = {"deviceId": "0", "temperature": 99.8184}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 92.55404}
[main] INFO - Consumed event: key = 3, value = {"deviceId": "3", "temperature": 79.467354}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 77.81964}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 87.234375}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 78.16981}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 97.42639}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 98.66289}From your local machine, stop the broker and Schema Registry containers. From the top-level tutorials directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml downThis section summarizes the application source files in src/main/java/io/confluent/developer/.
The AvroProducer class demonstrates how to produce Avro-encoded messages to a Kafka topic using Schema Registry. The producer:
Loads configuration: Reads Kafka and Schema Registry connection properties from a file.
Configures serializers: Sets the key serializer to StringSerializer and the value serializer to KafkaAvroSerializer. The KafkaAvroSerializer automatically registers the Avro schema with Schema Registry on first use and embeds a schema ID in each message.
Creates producer instance: Instantiates a KafkaProducer parameterized with String keys and TempReading (the generated Avro class) values.
Generates and sends records: Creates random temperature readings corresponding to random device IDs and produces messages to the readings topic.
Cleans up: Flushes any pending records and closes the producer to ensure all messages are sent before the application terminates.
The AvroConsumer class demonstrates how to consume Avro-encoded messages from a Kafka topic using Schema Registry. The consumer:
Loads configuration: Reads Kafka and Schema Registry connection properties from a properties file.
Configures deserializers: Sets the key deserializer to StringDeserializer and the value deserializer to KafkaAvroDeserializer. The KafkaAvroDeserializer automatically retrieves the schema from Schema Registry using the schema ID embedded in each message.
Sets consumer properties:
Subscribes and polls: Subscribes to the readings topic and enters a polling loop that continuously fetches records in batches. Each consumed record is logged with its key and deserialized TempReading value.