Get Started Free
course: Apache Kafka® for Python Developers

Hands On: Use the Python Producer Class with Schemas

Screenshot 2024-09-18 at 3.42.50 PM

Dave Klein

Senior Developer Advocate

Produce Events with JSON Schema

In this exercise, you will define a JSON schema and then produce events using a Producer, a JSONSerializer and Schema Registry.

Prerequisites

  1. We will be using a topic called temp_readings. Use the Confluent Cloud Console to create this topic with defaults.

  2. We enabled Confluent Cloud Streams Governance in the previous Hands On: Confluent Cloud Setup exercise and doing so also established a Schema Registry instance that this and the next exercise require. If you did not previously complete that exercise, do so now.

Project Setup

  1. Open a terminal window and navigate to the kafka-python directory that you created in the previous exercise.

  2. If you are not currently using the kafka-env environment that was created in the first exercise, switch to it with the following command:

    source kafka-env/bin/activate
  3. The Python packages jsonschema and requests are required by the JSONSerializer so we need to install them so the corresponding import statements can be satisfied. Run the following commands:

    pip install jsonschema
    pip install requests
  4. Change back to the kafka-python directory, open the config.py file that we’ve been using for previous exercises and add the following dictionary:

    sr_config = {
        'url': '<schema.registry.url>',
        'basic.auth.user.info':'<SR_API_KEY>:<SR_API_SECRET>'
    }
  5. Update the values for url and basic.auth.user.info using the related property values in ~/.confluent/python.properties.

    Note: You should have created this file during the first exercise of this course.

  6. Create and open a new file called json_producer.py.

Add Required Imports

  1. Add the following import statements to the top of the json_producer.py file:

    from confluent_kafka import Producer
    from confluent_kafka.serialization import SerializationContext, MessageField
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry.json_schema import JSONSerializer
    from config import config, sr_config
    import time

Define our class and schema

  1. Add the following class declaration;

    class Temperature(object):
        def __init__(self, city, reading, unit, timestamp):
            self.city = city
            self.reading = reading
            self.unit = unit
            self.timestamp = timestamp
  2. Add schema_str declaration:

    schema_str = """{
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "title": "Temperature",
        "description": "Temperature sensor reading",
        "type": "object",
        "properties": {
          "city": {
            "description": "City name",
            "type": "string"
          },
          "reading": {
            "description": "Current temperature reading",
            "type": "number"
          },
          "unit": {
            "description": "Temperature unit (C/F)",
            "type": "string"
          },
          "timestamp": {
            "description": "Time of reading in ms since epoch",
            "type": "number"
          }
        }
      }"""
  3. Add this function to convert our Temperature object to a dictionary:

    def temp_to_dict(temp, ctx):
        return {"city":temp.city, 
                "reading":temp.reading,
                "unit":temp.unit, 
                "timestamp":temp.timestamp}

Create some test data

  1. Add the following list of dictionaries containing some sample temperature readings.

    data = [Temperature('London', 12, 'C', round(time.time()*1000)),
            Temperature('Chicago', 63, 'F', round(time.time()*1000)),
            Temperature('Berlin', 14, 'C', round(time.time()*1000)),
            Temperature('Madrid', 18, 'C', round(time.time()*1000)),
            Temperature('Phoenix', 78, 'F', round(time.time()*1000))]

Create a producer callback function

  1. Add a function to pass in the produce() call to see if it is successful.

    def delivery_report(err, event):
        if err is not None:
            print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
        else:
            print(f'Temp reading for {event.key().decode("utf8")} produced to {event.topic()}')

Add Main Block

  1. Add the following main block.

    if __name__ == '__main__':
        topic = 'temp_readings'
        schema_registry_client = SchemaRegistryClient(sr_config)
    
        json_serializer = JSONSerializer(schema_str,
                                         schema_registry_client,
                                         temp_to_dict)
    
        producer = Producer(config)
        for temp in data:
            producer.produce(topic=topic, key=str(temp.city),
                             value=json_serializer(temp, 
                             SerializationContext(topic, MessageField.VALUE)),
                             on_delivery=delivery_report)
    
        producer.flush()

    After setting a variable for our topic name, we’re creating a SchemaRegistryClient instance, using our new sr_config dictionary. Then we create a JSONSerializer instance with the Temperature schema string, the SchemaRegistryClient, and the temp_to_dict function. Next, we create a Producer instance with the config that we’ve used in earlier exercises. Now we’ll iterate over our test data and produce events. Note that the event key is a string, but the event value is serialized by the JSONSerializer. Finally, we call producer.flush() to make sure all of the events are sent and the callbacks are executed.

  2. Execute the program by running the following command

    python json_producer.py

    You should see output similar to this:

    Temp reading for Phoenix produced to temp_readings
    Temp reading for Berlin produced to temp_readings
    Temp reading for Madrid produced to temp_readings
    Temp reading for Chicago produced to temp_readings
    Temp reading for London produced to temp_readings

In the next exercise we’ll see how to consume events that have been serialized with JSON Schema.

Use the promo code PYTHONKAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.