Enhance your career, get Certified as a Data Streaming Engineer | Get Certified
If you have existing Kafka applications that produce and consume messages in formats that Schema Registry supports (raw JSON, Apache Avro™, or Protobuf), you can adopt Schema Registry without breaking your legacy consumers. This tutorial demonstrates how to achieve backward compatibility by configuring producers and consumers to pass schema IDs in message headers, rather than embedding them inline as a prefix in the message payload. This approach allows your existing consumers to continue operating normally, affording you the flexibility to upgrade consumers to use Schema Registry when convenient rather than in lockstep with producers (having to "stop the world").
Note that the applications in this tutorial are written in Python and the pre-Schema Registry applications produce and consume raw JSON messages, but the same "schema ID in headers" functionality is supported for the Java client as well as other librdkafka-based non-Java clients.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLog in to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name kafka-sr-integration-env \
--kafka-cluster-name kafka-sr-integration-cluster \
--create-kafka-key \
--kafka-librdkafka-properties-file ./schema-registry-integration/cloud-kafka.properties \
--create-sr-key \
--schema-registry-properties-file ./schema-registry-integration/cloud-sr.propertiesThe plugin should complete in under a minute.
Create the topic for the application:
confluent kafka topic create usersNavigate into the application's home directory:
cd schema-registry-integration/Create and activate a Python virtual environment to give yourself an isolated workspace for installing dependencies. To use virtualenv:
virtualenv env
source env/bin/activateInstall the application's dependencies:
pip install -r requirements.txtIn one terminal window, run the consumer application that deserializes plain JSON strings. Pass the Kafka client configuration file generated when you created Confluent Cloud resources:
python json_consumer.py \
--kafka-properties-file cloud-kafka.propertiesIn another terminal window, run the corresponding producer application which serializes messages using json.dumps():
python json_producer.py \
--kafka-properties-file cloud-kafka.propertiesObserve that the consumer simply outputs ten random user records resembling this:
Consumed event from topic users: key = 1b589975-a1b7-43e0-b655-46be1e278171 value = {"user_id":"1b589975-a1b7-43e0-b655-46be1e278171","name":"Blake James","email":"ashley51@example.net"}With the first version of the consumer running, let's upgrade the producer to use Schema Registry and pass schema IDs in message headers. Refer to this tutorial for the basics on adding Schema Registry to your Python client applications. To pass schema IDs in message headers, there are three things to do:
producer.produce(
topic=topic,
key=string_serializer(user_id),
value=serialized_value,
headers=headers,
on_delivery=delivery_report
)Run the upgraded producer application that uses Schema Registry. This application will auto-register the passed JSON Schema:
python json_schema_producer.py \
--kafka-properties-file cloud-kafka.properties \
--sr-properties-file cloud-sr.properties \
--schema-file user-schema.jsonBecause the serializer is configured to use the header_schema_id_serializer, the message value payload will continue to be read by the original non-schematized consumer. Observe that it continues to successfully consume and output the ten additional user records just produced.
If we had instead relied on schema IDs prefixed in the message payload, then the consumer wouldn't be able to consume the messages:
Error occurred: 'utf-8' codec can't decode byte 0x86 in position 3: invalid start byteNow let's upgrade the consumer application to use Schema Registry. All that's required to read the schema ID in the message header is to configure the deserializer to use the dual_schema_id_deserializer, which will first look for the schema ID in the message headers, and then fall back to looking for it within the message payload.
deserializer_config = {
'schema.id.deserializer': dual_schema_id_deserializer
}
json_deserializer = JSONDeserializer(schema_registry_client=schema_registry_client,
schema_str=schema_str,
conf=deserializer_config,
from_dict=User.dict_to_user)To run the upgraded consumer, first stop the original consumer if it's still running. Then run the upgraded consumer:
python json_schema_consumer.py \
--kafka-properties-file cloud-kafka.properties \
--sr-properties-file cloud-sr.properties \
--schema-file user-schema.jsonNow in another terminal window run the upgraded producer once more to ensure that the upgraded consumer is working as expected:
python json_schema_producer.py \
--kafka-properties-file cloud-kafka.properties \
--sr-properties-file cloud-sr.properties \
--schema-file user-schema.jsonYou will see that, because the upgraded consumer uses the same consumer group ID, the upgraded consumer outputs only the ten new records just produced:
Consumed user: user_id = 1f9f5b43-35a0-4fee-96ec-d62d14207b1f, name = Donna Scott, email = tparker@example.com
...When you're finished, delete the kafka-sr-integration-env environment. First, get its environment ID (of the form env-123456):
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>