This tutorial walks you through producing messages to Kafka via a Python web service using the Confluent Python Client for Apache Kafka®. It focuses on the library's asynchronous Kafka API in order to showcase how to interact with Kafka in asyncio-based applications. It uses Quart given that it also natively supports asyncio.
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLog in to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name kafka-python-async-env \
--kafka-cluster-name kafka-python-async-cluster \
--create-kafka-key \
--kafka-librdkafka-properties-file ./kafka-python-async/cloud-kafka.properties \
--create-sr-key \
--schema-registry-properties-file ./kafka-python-async/cloud-sr.propertiesThe plugin should complete in under a minute.
Create the topics for the application: a readings topic for temperature readings without an associated schema, and readings_schematized which will have an associated schema.
confluent kafka topic create readings
confluent kafka topic create readings_schematizedNavigate into the application's home directory:
cd kafka-python-async/Create and activate a Python virtual environment to give yourself an isolated workspace for installing dependencies. To use virtualenv:
virtualenv env
source env/bin/activateInstall the application's dependencies:
pip install -r requirements.txtThis installs the following dependencies:
The application source code lives in app.py. Open this file in a text editor or IDE.
The first two functions, create_app and cli_wrapper, define the Quart application and handle command-line parsing. The application takes as input files containing Kafka and Schema Registry endpoints and credentials (if applicable) so that the same source code can be used against any Kafka environment.
The next three functions are Quart application lifecycle coroutines that create a Kafka producer client and a Schema Registry client before the application begins serving requests (dictated by the @app.before_serving Quart decorator), and delete them when the application exits (dictated by the @app.after_serving decorator).
To create Kafka and Schema Registry clients:
producer = AIOProducer(producer_conf.properties)
schema_registry_client = AsyncSchemaRegistryClient(schema_registry_conf.properties)To close the clients, only the producer needs to be flushed (optionally) and closed via coroutines that you await:
await producer.flush()
await producer.close()Finally, the record_temp coroutine defines an HTTP API for posting temperature readings. The Kafka API to produce events asynchronously is straightforward: call produce, which returns a Future; then await the Future to get the delivered Message status, including any errors.
@app.post("/record_temp/<temp>")
async def record_temp(temp):
try:
delivery_future = await producer.produce("readings", value=temp)
delivered_msg = await delivery_future
if delivered_msg.error() is not None:
return jsonify({"status": "error", "message": f"{delivered_msg.error().str()}"}), 500
else:
return jsonify({"status": "success"}), 200
except KafkaException as ke:
return jsonify({"status": "error", "message": f"{ke.args[0].str()}"}), 500
except:
return jsonify({"status": "error", "message": "Unknown Error"}), 500The record_temp_schematized coroutine looks very similar, with the main difference being that we first serialize the value with the AsyncAvroSerializer instantiated during application initialization:
value = {"temp": temp}
serialized_value = await avro_serializer(value,
SerializationContext("readings_schematized",
MessageField.VALUE))
delivery_future = await producer.produce("readings_schematized", value=serialized_value)
delivered_msg = await delivery_futureRun the application, passing the Kafka and Schema Registry client configuration files generated when you created Confluent Cloud resources:
quart serve ./cloud-kafka.properties ./cloud-sr.propertiesIn a new terminal window, start a console consumer:
confluent kafka topic consume readingsNext, send events to the /record_temp/ route, which will, in turn, produce temperature readings to the readings topic:
curl -X POST localhost:5000/record_temp/78.2You will see a success response:
{"status":"success"}If you pass a -I option to curl you would see an HTTP 200 response code indicating success.
Feel free to send more readings and verify that they show up in the console consumer.
Producing events that have an associated schema works similarly. First, start a console consumer configured to interpret Avro-formatted message values:
confluent kafka topic consume readings_schematized \
--value-format avro \
--from-beginningNext, send events to the /record_temp_schematized/ route, which will, in turn, produce temperature readings to the readings_schematized topic:
curl -X POST localhost:5000/record_temp_schematized/92.31You will see a success response:
{"status":"success"}In the console consumer, you'll see the schematized reading:
{"temp":92.31}When you're finished, delete the kafka-python-async-env environment. First, get its environment ID (of the form env-123456):
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>git clone git@github.com:confluentinc/tutorials.git
cd tutorialsStart Kafka and Schema Registry with the following command from the top-level tutorials repository:
docker compose -f ./docker/docker-compose-kafka-sr.yml up -dOpen a shell in the broker container:
docker exec -it broker /bin/bashCreate the topics for the application: a readings topic for temperature readings without an associated schema, and readings_schematized which will have an associated schema.
kafka-topics --bootstrap-server localhost:9092 --create --topic readings
kafka-topics --bootstrap-server localhost:9092 --create --topic readings_schematizedNavigate into the application's home directory:
cd kafka-python-async/Create and activate a Python virtual environment to give yourself an isolated workspace for installing dependencies. To use virtualenv:
virtualenv env
source env/bin/activateInstall the application's dependencies:
pip install -r requirements.txtThis installs the following dependencies:
The application source code lives in app.py. Open this file in a text editor or IDE.
The first two functions, create_app and cli_wrapper, define the Quart application and handle command-line parsing. The application takes as input files containing Kafka and Schema Registry endpoints and credentials (if applicable) so that the same source code can be used against any Kafka environment.
The next three functions are Quart application lifecycle coroutines that create a Kafka producer client and a Schema Registry client before the application begins serving requests (dictated by the @app.before_serving Quart decorator), and delete them when the application exits (dictated by the @app.after_serving decorator).
To create Kafka and Schema Registry clients:
producer = AIOProducer(producer_conf.properties)
schema_registry_client = AsyncSchemaRegistryClient(schema_registry_conf.properties)To close the clients, only the producer needs to be flushed (optionally) and closed via coroutines that you await:
await producer.flush()
await producer.close()Finally, the record_temp coroutine defines an HTTP API for posting temperature readings. The Kafka API to produce events asynchronously is straightforward: call produce, which returns a Future; then await the Future to get the delivered Message status, including any errors.
@app.post("/record_temp/<temp>")
async def record_temp(temp):
try:
delivery_future = await producer.produce("readings", value=temp)
delivered_msg = await delivery_future
if delivered_msg.error() is not None:
return jsonify({"status": "error", "message": f"{delivered_msg.error().str()}"}), 500
else:
return jsonify({"status": "success"}), 200
except KafkaException as ke:
return jsonify({"status": "error", "message": f"{ke.args[0].str()}"}), 500
except:
return jsonify({"status": "error", "message": "Unknown Error"}), 500The record_temp_schematized coroutine looks very similar, with the main difference being that we first serialize the value with the AsyncAvroSerializer instantiated during application initialization:
value = {"temp": temp}
serialized_value = await avro_serializer(value,
SerializationContext("readings_schematized",
MessageField.VALUE))
delivery_future = await producer.produce("readings_schematized", value=serialized_value)
delivered_msg = await delivery_futureRun the application, passing the Kafka and Schema Registry client configuration files generated when you created Confluent Cloud resources:
quart serve ./local-kafka.properties ./local-sr.propertiesIn a new terminal window, open a shell in the broker container and start a console consumer:
docker exec -it broker /bin/bashkafka-console-consumer --bootstrap-server localhost:9092 --topic readings --from-beginningNext, send events to the /record_temp/ route, which will, in turn, produce temperature readings to the readings topic:
curl -X POST localhost:5000/record_temp/78.2You will see a success response:
{"status":"success"}If you pass a -I option to curl you would see an HTTP 200 response code indicating success.
Feel free to send more readings and verify that they show up in the console consumer.
Producing events that have an associated schema works similarly. First, start a console consumer for Avro-formatted message values by opening a shell in the Schema Registry container and running the included kafka-avro-console-consumer command-line utility:
docker exec -it schema-registry /bin/bashkafka-avro-console-consumer \
--topic readings_schematized \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--from-beginningNext, send events to the /record_temp_schematized/ route, which will, in turn, produce temperature readings to the readings_schematized topic:
curl -X POST localhost:5000/record_temp_schematized/92.31You will see a success response:
{"status":"success"}In the console consumer, you'll see the schematized reading:
{"temp":92.31}From your local machine, stop the broker and Schema Registry containers:
docker compose -f ./docker/docker-compose-kafka-sr.yml down