Get Started Free
course: Apache Kafka® for Python Developers

Hands On: Use the Python AdminClient Class

dave-klein-headshot

Dave Klein

Developer Advocate

Use AdminClient to Create a Topic and Alter its Configuration

In this exercise you will use the AdminClient class to create a new Kafka topic and alter one of its configuration properties.

Project Setup

  1. Open a terminal window and navigate to the kafka-python directory that you created in the previous exercise.

  2. If you are not currently using the kafka-env environment that was created in the last exercise, switch to it with the following command:

    source kafka-env/bin/activate
  3. Create a file called admin.py.

  4. Open admin.py in an IDE of your choice.

Add Required Imports

  1. Add the following import statements to the top of the admin.py file:

    from confluent_kafka.admin import (AdminClient, NewTopic, 
                                       ConfigResource)
    from config import config

Check if Kafka Topic Exists

  1. Add a function that uses list_topics() to check to see if a specific topic already exists:

    # return True if topic exists and False if not
    def topic_exists(admin, topic):
        metadata = admin.list_topics()
        for t in iter(metadata.topics.values()):
            if t.topic == topic:
                return True
        return False

Create a New Kafka Topic

  1. Add a function using a NewTopic instance and the create_topics() function to create a topic:

    # create new topic and return results dictionary
    def create_topic(admin, topic):
        new_topic = NewTopic(topic, num_partitions=6, replication_factor=3) 
        result_dict = admin.create_topics([new_topic])
        for topic, future in result_dict.items():
            try:
                future.result()  # The result itself is None
                print("Topic {} created".format(topic))
            except Exception as e:
                print("Failed to create topic {}: {}".format(topic, e))

Describe the New Kafka Topic

  1. Create a function that uses a ConfigResource instance and the describe_configs() function:

    # get max.message.bytes property
    def get_max_size(admin, topic):
        resource = ConfigResource('topic', topic)
        result_dict = admin.describe_configs([resource])
        config_entries = result_dict[resource].result()
        max_size = config_entries['max.message.bytes']
        return max_size.value

Set a Kafka Topic Configuration Property Value

  1. Now add a function using the alter_configs() function to set the max.message.bytes property:

    # set max.message.bytes for topic
    def set_max_size(admin, topic, max_k):
        config_dict = {'max.message.bytes': str(max_k*1024)}
        resource = ConfigResource('topic', topic, config_dict)
        result_dict = admin.alter_configs([resource])
        result_dict[resource].result()

Add Main Block

  1. Add a main block to put the new functions to work:

    if __name__ == '__main__':
    
        # Create Admin client
        admin = AdminClient(config)
        topic_name = 'my_topic'
        max_msg_k = 50
    
        # Create topic if it doesn't exist
        if not topic_exists(admin, topic_name):
            create_topic(admin, topic_name)
    
        # Check max.message.bytes config and set if needed
        current_max = get_max_size(admin, topic_name)
        if current_max != str(max_msg_k * 1024):
            print(f'Topic, {topic_name} max.message.bytes is {current_max}.')
            set_max_size(admin, topic_name, max_msg_k)
    
        # Verify config was set 
        new_max = get_max_size(admin, topic_name)
        print(f'Now max.message.bytes for topic {topic_name} is {new_max}')

Test admin.py

  1. Save the admin.py file.

  2. Run admin.py from the command line with this command:

    python admin.py
  3. Verify that the output is something like this:

    Topic my_topic created
    Topic, my_topic max.message.bytes is currently 2097164.
    Now max.message.bytes for topic my_topic is 51200

Exercise Environment Teardown

After completing the course exercises, you need to tear down the learn-kafka-python environment to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.

  1. In the Confluent Cloud console, navigate to Environments.

  2. Click on the learn-kafka-python environment.

  3. Click the Delete Environment link at the bottom right.

  4. Confirm the delete request and click Continue.

Use the promo code PYTHONKAFKA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.