Developer Advocate
In this exercise, we will consume the events we just produced and use the JSONDeserializer to turn those events into objects we can work with in our Python application.
We will be using the temp_readings topic that we created in the previous exercise.
Open a terminal window and navigate to the kafka-python directory.
If you are not currently using the kafka-env environment that was created in the first exercise, switch to it with the following command:
source kafka-env/bin/activate
Create and open a new file called json_consumer.py.
Add the following import statements to the top of the json_consumer.py file:
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from config import config
Create the following function to set group.id and auto.offset.reset.
def set_consumer_configs():
config['group.id'] = 'temp_group'
config['auto.offset.reset'] = 'earliest'
Add the following class declaration;
class Temperature(object):
def __init__(self, city, reading, unit, timestamp):
self.city = city
self.reading = reading
self.unit = unit
self.timestamp = timestamp
Add the schema_str declaration:
schema_str = """{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Temperature",
"description": "Temperature sensor reading",
"type": "object",
"properties": {
"city": {
"description": "City name",
"type": "string"
},
"reading": {
"description": "Current temperature reading",
"type": "number"
},
"unit": {
"description": "Temperature unit (C/F)",
"type": "string"
},
"timestamp": {
"description": "Time of reading in ms since epoch",
"type": "number"
}
}
}"""
Add this function to convert a dictionary into a Temperature object:
def dict_to_temp(dict, ctx):
return Temperature(dict['city'], dict['reading'], dict['unit'], dict['timestamp'])
Begin the main block as follows.
if __name__ == '__main__':
topic = 'temp_readings'
json_deserializer = JSONDeserializer(schema_str, from_dict=dict_to_temp)
First we declare a variable for the topic name and then create an instance of the JSONDeserializer based on our temperature schema.
Set up Consumer instance.
Now add the following lines to update the consumer configs, create the consumer instance and subscribe to the temp_readings topic.
set_consumer_configs()
consumer = Consumer(config)
consumer.subscribe([topic])
Finally, add the poll loop to receive and process events.
In the poll loop we will use the JSONDeserializer instance to turn the binary event data into useable data objects.
while True:
try:
event = consumer.poll(1.0)
if event is None:
continue
temp = json_deserializer(event.value(),
SerializationContext(topic, MessageField.VALUE))
if temp is not None:
print(f'Latest temp in {temp.city} is {temp.reading} {temp.unit}.')
except KeyboardInterrupt:
break
consumer.close()
Execute the program by running the following command
python json_consumer.py
You should see output similar to this:
Latest temp in Berlin is 14 C.
Latest temp in Madrid is 18 C.
Latest temp in Phoenix is 78 F.
Latest temp in London is 12 C.
Latest temp in Chicago is 63 F.
You’ve now successfully consumed data that was produced using JSON Schema. Now try experimenting with that schema or creating a custom schema for your own use case.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.