Developer Advocate
In this exercise you will use the AdminClient class to create a new Kafka topic and alter one of its configuration properties.
Open a terminal window and navigate to the kafka-python directory that you created in the previous exercise.
If you are not currently using the kafka-env environment that was created in the last exercise, switch to it with the following command:
source kafka-env/bin/activate
Create a file called admin.py.
Open admin.py in an IDE of your choice.
Add the following import statements to the top of the admin.py file:
from confluent_kafka.admin import (AdminClient, NewTopic,
ConfigResource)
from config import config
Add a function that uses list_topics() to check to see if a specific topic already exists:
# return True if topic exists and False if not
def topic_exists(admin, topic):
metadata = admin.list_topics()
for t in iter(metadata.topics.values()):
if t.topic == topic:
return True
return False
Add a function using a NewTopic instance and the create_topics() function to create a topic:
# create new topic and return results dictionary
def create_topic(admin, topic):
new_topic = NewTopic(topic, num_partitions=6, replication_factor=3)
result_dict = admin.create_topics([new_topic])
for topic, future in result_dict.items():
try:
future.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
Create a function that uses a ConfigResource instance and the describe_configs() function:
# get max.message.bytes property
def get_max_size(admin, topic):
resource = ConfigResource('topic', topic)
result_dict = admin.describe_configs([resource])
config_entries = result_dict[resource].result()
max_size = config_entries['max.message.bytes']
return max_size.value
Now add a function using the alter_configs() function to set the max.message.bytes property:
# set max.message.bytes for topic
def set_max_size(admin, topic, max_k):
config_dict = {'max.message.bytes': str(max_k*1024)}
resource = ConfigResource('topic', topic, config_dict)
result_dict = admin.alter_configs([resource])
result_dict[resource].result()
Add a main block to put the new functions to work:
if __name__ == '__main__':
# Create Admin client
admin = AdminClient(config)
topic_name = 'my_topic'
max_msg_k = 50
# Create topic if it doesn't exist
if not topic_exists(admin, topic_name):
create_topic(admin, topic_name)
# Check max.message.bytes config and set if needed
current_max = get_max_size(admin, topic_name)
if current_max != str(max_msg_k * 1024):
print(f'Topic, {topic_name} max.message.bytes is {current_max}.')
set_max_size(admin, topic_name, max_msg_k)
# Verify config was set
new_max = get_max_size(admin, topic_name)
print(f'Now max.message.bytes for topic {topic_name} is {new_max}')
Save the admin.py file.
Run admin.py from the command line with this command:
python admin.py
Verify that the output is something like this:
Topic my_topic created
Topic, my_topic max.message.bytes is currently 2097164.
Now max.message.bytes for topic my_topic is 51200
After completing the course exercises, you need to tear down the learn-kafka-python environment to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.
In the Confluent Cloud console, navigate to Environments.
Click on the learn-kafka-python environment.
Click the Delete Environment link at the bottom right.
Confirm the delete request and click Continue.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.