Senior Developer Advocate
In this exercise, you will define a JSON schema and then produce events using a Producer, a JSONSerializer and Schema Registry.
We will be using a topic called temp_readings. Use the Confluent Cloud Console to create this topic with defaults.
We enabled Confluent Cloud Streams Governance in the previous Hands On: Confluent Cloud Setup exercise and doing so also established a Schema Registry instance that this and the next exercise require. If you did not previously complete that exercise, do so now.
Open a terminal window and navigate to the kafka-python directory that you created in the previous exercise.
If you are not currently using the kafka-env environment that was created in the first exercise, switch to it with the following command:
source kafka-env/bin/activate
The Python packages jsonschema and requests are required by the JSONSerializer so we need to install them so the corresponding import statements can be satisfied. Run the following commands:
pip install jsonschema
pip install requests
Change back to the kafka-python directory, open the config.py file that we’ve been using for previous exercises and add the following dictionary:
sr_config = {
'url': '<schema.registry.url>',
'basic.auth.user.info':'<SR_API_KEY>:<SR_API_SECRET>'
}
Update the values for url and basic.auth.user.info using the related property values in ~/.confluent/python.properties.
Note: You should have created this file during the first exercise of this course.
Create and open a new file called json_producer.py.
Add the following import statements to the top of the json_producer.py file:
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from config import config, sr_config
import time
Add the following class declaration;
class Temperature(object):
def __init__(self, city, reading, unit, timestamp):
self.city = city
self.reading = reading
self.unit = unit
self.timestamp = timestamp
Add schema_str declaration:
schema_str = """{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Temperature",
"description": "Temperature sensor reading",
"type": "object",
"properties": {
"city": {
"description": "City name",
"type": "string"
},
"reading": {
"description": "Current temperature reading",
"type": "number"
},
"unit": {
"description": "Temperature unit (C/F)",
"type": "string"
},
"timestamp": {
"description": "Time of reading in ms since epoch",
"type": "number"
}
}
}"""
Add this function to convert our Temperature object to a dictionary:
def temp_to_dict(temp, ctx):
return {"city":temp.city,
"reading":temp.reading,
"unit":temp.unit,
"timestamp":temp.timestamp}
Add the following list of dictionaries containing some sample temperature readings.
data = [Temperature('London', 12, 'C', round(time.time()*1000)),
Temperature('Chicago', 63, 'F', round(time.time()*1000)),
Temperature('Berlin', 14, 'C', round(time.time()*1000)),
Temperature('Madrid', 18, 'C', round(time.time()*1000)),
Temperature('Phoenix', 78, 'F', round(time.time()*1000))]
Add a function to pass in the produce() call to see if it is successful.
def delivery_report(err, event):
if err is not None:
print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
else:
print(f'Temp reading for {event.key().decode("utf8")} produced to {event.topic()}')
Add the following main block.
if __name__ == '__main__':
topic = 'temp_readings'
schema_registry_client = SchemaRegistryClient(sr_config)
json_serializer = JSONSerializer(schema_str,
schema_registry_client,
temp_to_dict)
producer = Producer(config)
for temp in data:
producer.produce(topic=topic, key=str(temp.city),
value=json_serializer(temp,
SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
producer.flush()
After setting a variable for our topic name, we’re creating a SchemaRegistryClient instance, using our new sr_config dictionary. Then we create a JSONSerializer instance with the Temperature schema string, the SchemaRegistryClient, and the temp_to_dict function. Next, we create a Producer instance with the config that we’ve used in earlier exercises. Now we’ll iterate over our test data and produce events. Note that the event key is a string, but the event value is serialized by the JSONSerializer. Finally, we call producer.flush() to make sure all of the events are sent and the callbacks are executed.
Execute the program by running the following command
python json_producer.py
You should see output similar to this:
Temp reading for Phoenix produced to temp_readings
Temp reading for Berlin produced to temp_readings
Temp reading for Madrid produced to temp_readings
Temp reading for Chicago produced to temp_readings
Temp reading for London produced to temp_readings
In the next exercise we’ll see how to consume events that have been serialized with JSON Schema.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.