This tutorial demonstrates how to safely evolve Kafka message schemas managed by Schema Registry. Schema Registry natively supports the ever-changing nature of message schemas by enforcing compatibility rules that ensure producers and consumers can work together even as schemas change. You'll walk through a practical example of evolving a schema by adding a new field and changing a field's data type. The tutorial also shows how to update Kafka producer and consumer applications in the correct order to maintain compatibility throughout the evolution process.
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLog in to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial.
Note: You may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name schema-evolution-env \
--kafka-cluster-name schema-evolution-cluster \
--create-kafka-key \
--create-sr-key \
--kafka-java-properties-file ./schema-evolution/src/main/resources/cloud.propertiesThe plugin should complete in under a minute.
Create the topic for the application:
confluent kafka topic create readingsCompile the application from the top-level tutorials repository directory:
./gradlew schema-evolution:shadowJarNavigate into the application's home directory:
cd schema-evolutionRun the producer application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/cloud.properties \
./src/main/avro/temp-reading.avscThis will produce ten temperature reading messages that have two fields: a string deviceId and float temperature.
Next, run the consumer application:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroConsumer \
./src/main/resources/cloud.propertiesYou should see output similar to the following:
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 4, deviceId = 4, temperature = 89.11165
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 98.30217
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 0, deviceId = 0, temperature = 94.170296
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 94.42372
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 93.87649
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 4, deviceId = 4, temperature = 91.70332
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 1, deviceId = 1, temperature = 85.44849
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 83.115486
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 1, deviceId = 1, temperature = 80.96756
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 0, deviceId = 0, temperature = 90.369835Enter Ctrl+C to exit the application.
Let's evolve the temperature reading schema in a backward compatible way by adding an optional factoryId field, and also widening the type of the temperature field from a 4-byte float to an 8-byte double. These changes are captured in the ./src/main/avro/temp-reading-v2.avsc schema file. Refer to this table for details on which schema changes are allowed for a given compatibility type and schema format.
Next, review the modified V2 applications that work with the updated schema. The producer (AvroProducerV2) now populates the factoryId field:
String factoryId = String.valueOf(rand.nextInt(5));
...
GenericRecord reading = new GenericData.Record(schema);
...
reading.put("factoryId", factoryId);Similarly, the consumer (AvroConsumerV2) is updated to log the factory ID to the console, and it handles both float and double temperatures:
double temp = ((Number)value.get("temperature")).doubleValue();Note that, because Avro GenericRecord doesn't offer type safety, developers must ensure that the applications using this API evolve in a backward compatible way. As an example of a non-backward compatible change, the expression (Double)value.get("temperature"); would trigger an exception if run against records produced by AvroProducer because a boxed Float doesn't automatically get re-boxed to a Double. Attempting to cast a Float to a Double triggers a RuntimeException with message class java.lang.Float cannot be cast to class java.lang.Double.
Since our schema compatibility type is the default BACKWARD, consumers using the new schema can read data produced with the previous schema. Conversely, there is no guarantee that a consumer using the first version of the schema will work with data produced with the updated version. Therefore, we must upgrade consumers before upgrading producers.
Start the upgraded consumer and leave it running:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroConsumerV2 \
./src/main/resources/cloud.propertiesRun the upgraded producer in another terminal:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroProducerV2 \
./src/main/resources/cloud.properties \
./src/main/avro/temp-reading-v2.avscRun the original producer again to observe that the new consumer works with it:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/cloud.properties \
./src/main/avro/temp-reading.avscYou will see that the upgraded consumer properly handles messages adhering to the original and updated schema:
[main] INFO Consumed event: key = 2, deviceId = 2, factoryId = 2, temperature = 75.96076296305301 (type class java.lang.Double)
[main] INFO Consumed event: key = 1, deviceId = 1, factoryId = 3, temperature = 83.7503375910799 (type class java.lang.Double)
...
[main] INFO Consumed event: key = 1, deviceId = 1, factoryId = N/A, temperature = 85.32722473144531 (type class java.lang.Float)
[main] INFO Consumed event: key = 0, deviceId = 0, factoryId = N/A, temperature = 89.28128051757812 (type class java.lang.Float)When you are finished, delete the schema-evolution-env environment by first getting the environment ID of the form env-123456 corresponding to it:
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>git clone git@github.com:confluentinc/tutorials.git
cd tutorialsStart Kafka and Schema Registry with the following command run from the top-level tutorials repository directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml up -dOpen a shell in the broker container:
docker exec -it broker /bin/bashCreate the topic for the application:
kafka-topics --bootstrap-server localhost:9092 --create --topic readingsExit the broker container by entering Ctrl+D.
Compile the application from the top-level tutorials repository directory:
./gradlew schema-evolution:shadowJarNavigate into the application's home directory:
cd schema-evolutionRun the producer application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092 and Schema Registry at http://localhost:8081:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/local.properties \
./src/main/avro/temp-reading.avscThis will produce ten temperature reading messages that have two fields: a string deviceId and float temperature.
Next, run the consumer application:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroConsumer \
./src/main/resources/local.propertiesYou should see output similar to the following:
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 4, deviceId = 4, temperature = 89.11165
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 98.30217
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 0, deviceId = 0, temperature = 94.170296
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 94.42372
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 93.87649
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 4, deviceId = 4, temperature = 91.70332
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 1, deviceId = 1, temperature = 85.44849
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 3, deviceId = 3, temperature = 83.115486
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 1, deviceId = 1, temperature = 80.96756
[main] INFO io.confluent.developer.AvroConsumer - Consumed event: key = 0, deviceId = 0, temperature = 90.369835Enter Ctrl+C to exit the application.
Let's evolve the temperature reading schema in a backward compatible way by adding an optional factoryId field, and also widening the type of the temperature field from a 4-byte float to an 8-byte double. These changes are captured in the ./src/main/avro/temp-reading-v2.avsc schema file. Refer to this table for details on which schema changes are allowed for a given compatibility type and schema format.
Next, review the modified V2 applications that work with the updated schema. The producer (AvroProducerV2) now populates the factoryId field:
String factoryId = String.valueOf(rand.nextInt(5));
...
GenericRecord reading = new GenericData.Record(schema);
...
reading.put("factoryId", factoryId);Similarly, the consumer (AvroConsumerV2) is updated to log the factory ID to the console, and it handles both float and double temperatures:
double temp = ((Number)value.get("temperature")).doubleValue();Note that, because Avro GenericRecord doesn't offer type safety, developers must ensure that the applications using this API evolve in a backward compatible way. As an example of a non-backward compatible change, the expression (Double)value.get("temperature"); would trigger an exception if run against records produced by AvroProducer because a boxed Float doesn't automatically get re-boxed to a Double. Attempting to cast a Float to a Double triggers a RuntimeException with message class java.lang.Float cannot be cast to class java.lang.Double.
Since our schema compatibility type is the default BACKWARD, consumers using the new schema can read data produced with the previous schema. Conversely, there is no guarantee that a consumer using the first version of the schema will work with data produced with the updated version. Therefore, we must upgrade consumers before upgrading producers.
Start the upgraded consumer and leave it running:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroConsumerV2 \
./src/main/resources/local.propertiesRun the upgraded producer in another terminal:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroProducerV2 \
./src/main/resources/local.properties \
./src/main/avro/temp-reading-v2.avscRun the original producer again to observe that the new consumer works with it:
java -cp ./build/libs/schema-evolution-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/local.properties \
./src/main/avro/temp-reading.avscYou will see that the upgraded consumer properly handles messages adhering to the original and updated schema:
[main] INFO Consumed event: key = 2, deviceId = 2, factoryId = 2, temperature = 75.96076296305301 (type class java.lang.Double)
[main] INFO Consumed event: key = 1, deviceId = 1, factoryId = 3, temperature = 83.7503375910799 (type class java.lang.Double)
...
[main] INFO Consumed event: key = 1, deviceId = 1, factoryId = N/A, temperature = 85.32722473144531 (type class java.lang.Float)
[main] INFO Consumed event: key = 0, deviceId = 0, factoryId = N/A, temperature = 89.28128051757812 (type class java.lang.Float)From your local machine, stop the broker and Schema Registry containers. From the top-level tutorials directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml down