This tutorial demonstrates how to build Kafka producer and consumer applications in Python that use Schema Registry for message schema management. You'll learn how to configure your Python applications to serialize and deserialize records, ensuring type safety and schema evolution compatibility. By the end of this tutorial, you'll have working applications that produce and consume device temperature reading records.
The applications in this tutorial use Avro-formatted messages. In order to use Protobuf or JSON Schema formatting, you would need to use a different serializer / deserializer, but otherwise the applications would be similarly structured.
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLog in to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name kafka-sr-env \
--kafka-cluster-name kafka-sr-cluster \
--create-kafka-key \
--kafka-librdkafka-properties-file ./schema-registry-python/cloud-kafka.properties \
--create-sr-key \
--schema-registry-properties-file ./schema-registry-python/cloud-sr.propertiesThe plugin should complete in under a minute.
Create the topic for the application:
confluent kafka topic create readingsNavigate into the application's home directory:
cd schema-registry-python/Create and activate a Python virtual environment to give yourself an isolated workspace for installing dependencies. To use virtualenv:
virtualenv env
source env/bin/activateInstall the application's dependencies:
pip install -r requirements.txtThis installs the following dependencies:
avro_producer.py: Implements a Kafka producer that generates 10 random temperature readings and publishes them to the readings topic. The application loads Kafka and Schema Registry configuration from properties files, creates an Avro serializer configured with Schema Registry, and produces messages with device IDs as keys and serialized TempReading objects as values.
avro_consumer.py: Implements a Kafka consumer that subscribes to the readings topic and continuously polls for new messages. The application uses an Avro deserializer configured with Schema Registry to convert incoming message bytes back into TempReading domain objects, which are then printed to the console.
temp-reading.avsc: Defines the Avro schema for temperature reading records with two fields: a string device_id and a float temperature.
temp_reading.py: Contains the TempReading class that represents a temperature reading domain object. The class also includes two static helper methods: reading_to_dict for serialization and dict_to_reading for deserialization, which are used by the Avro serializer and deserializer.
Run the producer application, passing the Kafka and Schema Registry client configuration files generated when you created Confluent Cloud resources:
python avro_producer.py \
--kafka-properties-file cloud-kafka.properties \
--sr-properties-file cloud-sr.properties \
--schema-file temp-reading.avscIn a new terminal window, run the consumer application:
python avro_consumer.py \
--kafka-properties-file cloud-kafka.properties \
--sr-properties-file cloud-sr.properties \
--schema-file temp-reading.avscYou will see output like:
Consumed reading: device_id = 4, temperature = 100.78572082519531
Consumed reading: device_id = 2, temperature = 93.64944458007812
Consumed reading: device_id = 1, temperature = 85.31315612792969
Consumed reading: device_id = 4, temperature = 79.18598175048828
Consumed reading: device_id = 3, temperature = 93.05386352539062
Consumed reading: device_id = 3, temperature = 106.12528991699219
Consumed reading: device_id = 3, temperature = 103.54008483886719
Consumed reading: device_id = 4, temperature = 79.39240264892578
Consumed reading: device_id = 3, temperature = 95.86831665039062
Consumed reading: device_id = 3, temperature = 106.3673095703125When you're finished, delete the kafka-sr-env environment. First, get its environment ID (of the form env-123456):
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>git clone git@github.com:confluentinc/tutorials.git
cd tutorialsStart Kafka and Schema Registry with the following command from the top-level tutorials repository:
docker compose -f ./docker/docker-compose-kafka-sr.yml up -dOpen a shell in the broker container:
docker exec -it broker /bin/bashCreate the topic for the application:
kafka-topics --bootstrap-server localhost:9092 --create --topic readingsNavigate into the application's home directory:
cd schema-registry-python/Create and activate a Python virtual environment to give yourself an isolated workspace for installing dependencies. To use virtualenv:
virtualenv env
source env/bin/activateInstall the application's dependencies:
pip install -r requirements.txtThis installs the following dependencies:
avro_producer.py: Implements a Kafka producer that generates 10 random temperature readings and publishes them to the readings topic. The application loads Kafka and Schema Registry configuration from properties files, creates an Avro serializer configured with Schema Registry, and produces messages with device IDs as keys and serialized TempReading objects as values.
avro_consumer.py: Implements a Kafka consumer that subscribes to the readings topic and continuously polls for new messages. The application uses an Avro deserializer configured with Schema Registry to convert incoming message bytes back into TempReading domain objects, which are then printed to the console.
temp-reading.avsc: Defines the Avro schema for temperature reading records with two fields: a string device_id and a float temperature.
temp_reading.py: Contains the TempReading class that represents a temperature reading domain object. The class also includes two static helper methods: reading_to_dict for serialization and dict_to_reading for deserialization, which are used by the Avro serializer and deserializer.
Run the application, passing the Kafka and Schema Registry client configuration files for connecting to Kafka and Schema Registry running in Docker:
python avro_producer.py \
--kafka-properties-file local-kafka.properties \
--sr-properties-file local-sr.properties \
--schema-file temp-reading.avscIn a new terminal window, run the consumer application:
python avro_consumer.py \
--kafka-properties-file local-kafka.properties \
--sr-properties-file local-sr.properties \
--schema-file temp-reading.avscYou will see output like:
Consumed reading: device_id = 4, temperature = 100.78572082519531
Consumed reading: device_id = 2, temperature = 93.64944458007812
Consumed reading: device_id = 1, temperature = 85.31315612792969
Consumed reading: device_id = 4, temperature = 79.18598175048828
Consumed reading: device_id = 3, temperature = 93.05386352539062
Consumed reading: device_id = 3, temperature = 106.12528991699219
Consumed reading: device_id = 3, temperature = 103.54008483886719
Consumed reading: device_id = 4, temperature = 79.39240264892578
Consumed reading: device_id = 3, temperature = 95.86831665039062
Consumed reading: device_id = 3, temperature = 106.3673095703125From your local machine, stop the broker and Schema Registry containers:
docker compose -f ./docker/docker-compose-kafka-sr.yml down