Get Started Free
course: Apache Kafka® for Python Developers

Hands On: Use the Python Consumer Class with Schemas

Screenshot 2024-09-18 at 3.42.50 PM

Dave Klein

Senior Developer Advocate

Consume Events with JSON Schema

In this exercise, we will consume the events we just produced and use the JSONDeserializer to turn those events into objects we can work with in our Python application.

Topics

We will be using the temp_readings topic that we created in the previous exercise.

Project Setup

  1. Open a terminal window and navigate to the kafka-python directory.

  2. If you are not currently using the kafka-env environment that was created in the first exercise, switch to it with the following command:

    source kafka-env/bin/activate
  3. Create and open a new file called json_consumer.py.

Add Required Imports

  1. Add the following import statements to the top of the json_consumer.py file:

    from confluent_kafka import Consumer
    from confluent_kafka.serialization import SerializationContext, MessageField
    from confluent_kafka.schema_registry.json_schema import JSONDeserializer
    from config import config

Add consumer specific configuration properties

  1. Create the following function to set group.id and auto.offset.reset.

    def set_consumer_configs():
        config['group.id'] = 'temp_group'
        config['auto.offset.reset'] = 'earliest'

Define our class and schema

  1. Add the following class declaration;

    class Temperature(object):
        def __init__(self, city, reading, unit, timestamp):
            self.city = city
            self.reading = reading
            self.unit = unit
            self.timestamp = timestamp
  2. Add the schema_str declaration:

    schema_str = """{
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "title": "Temperature",
        "description": "Temperature sensor reading",
        "type": "object",
        "properties": {
          "city": {
            "description": "City name",
            "type": "string"
          },
          "reading": {
            "description": "Current temperature reading",
            "type": "number"
          },
          "unit": {
            "description": "Temperature unit (C/F)",
            "type": "string"
          },
          "timestamp": {
            "description": "Time of reading in ms since epoch",
            "type": "number"
          }
        }
      }"""
  3. Add this function to convert a dictionary into a Temperature object:

    def dict_to_temp(dict, ctx):
        return Temperature(dict['city'], dict['reading'], dict['unit'], dict['timestamp'])

Add Main Block

  1. Begin the main block as follows.

    if __name__ == '__main__':
        topic = 'temp_readings'
    
        json_deserializer = JSONDeserializer(schema_str, from_dict=dict_to_temp)

    First we declare a variable for the topic name and then create an instance of the JSONDeserializer based on our temperature schema.

  2. Set up Consumer instance.
    Now add the following lines to update the consumer configs, create the consumer instance and subscribe to the temp_readings topic.

        set_consumer_configs()
        consumer = Consumer(config)
        consumer.subscribe([topic])
  3. Finally, add the poll loop to receive and process events.
    In the poll loop we will use the JSONDeserializer instance to turn the binary event data into useable data objects.

        while True:
            try:
                event = consumer.poll(1.0)
                if event is None:
                    continue
                temp = json_deserializer(event.value(), 
                    SerializationContext(topic, MessageField.VALUE))
                if temp is not None:
                    print(f'Latest temp in {temp.city} is {temp.reading} {temp.unit}.')
    
            except KeyboardInterrupt:
                break
    
        consumer.close()
  4. Execute the program by running the following command

    python json_consumer.py

    You should see output similar to this:

    Latest temp in Berlin is 14 C.
    Latest temp in Madrid is 18 C.
    Latest temp in Phoenix is 78 F.
    Latest temp in London is 12 C.
    Latest temp in Chicago is 63 F.

You’ve now successfully consumed data that was produced using JSON Schema. Now try experimenting with that schema or creating a custom schema for your own use case.

Use the promo code PYTHONKAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.