Get Started Free
Screenshot 2024-09-18 at 3.42.50 PM

Dave Klein

Senior Developer Advocate

Use Consumer to Read Events from Kafka

In this exercise, you will use the Consumer class to read events from a Kafka topic.

Topics

We will be using the hello_topic which we created in the Use Producer to Send Events to Kafka exercise. We will be reading these events during this exercise so if you did not complete the previous exercise, you will need to either do so or you can use the Confluent Cloud Console to manually create the topic and send some events to it.

Project Setup

  1. Open a terminal window and navigate to the kafka-python directory that you created in the previous exercise.

  2. If you are not currently using the kafka-env environment that was created in the last exercise, switch to it with the following command:

    source kafka-env/bin/activate
  3. Create a file called consumer.py.

  4. Open consumer.py in an IDE of your choice.

Add Required Imports

  1. Add the following import statements to the top of the consumer.py file:

    from confluent_kafka import Consumer, KafkaException
    from config import config

Create function to update configuration

  1. Create a function called set_consumer_configs(). We will call this method in the main block in order to add some consumer specific configuration properties.

    def set_consumer_configs():
        config['group.id'] = 'hello_group'
        config['auto.offset.reset'] = 'earliest'
        config['enable.auto.commit'] = False

    Along with setting the group.id, we are also setting auto.offset.reset to earliest so that we can consume events that are already in our target topic. Finally, we are setting enable.auto.commit to False so that we can control the committing of offsets for our consumer.

Create callback function for partition assignment

  1. Create a function called assignment_callback() that takes a consumer and a key.

    def assignment_callback(consumer, partitions):
        for p in partitions:
            print(f'Assigned to {p.topic}, partition {p.partition}')

    This callback will be passed into the consumer.subscribe() call and it will be invoked whenever topic partitions are assigned to this consumer. This includes during the subscribe() call, as well as any subsequent rebalancing.

Add Main Block

  1. Add the following beginnings of the main block.

    if __name__ == '__main__':
        set_consumer_configs()
        consumer = Consumer(config)
        consumer.subscribe(['hello_topic'], on_assign=assignment_callback)

    Here we are updating the config Dictionary, creating our Consumer and then calling the subscribe() method. This method takes a list of topic names. This is often a list of one. We are passing a function for the on_assign callback. We could also pass in functions for on_revoke and on_lost, but these are less commonly used.

  2. Add a try|except|finally block.

        try:
        except KeyboardInterrupt:
            print('Canceled by user.')
        finally:
            consumer.close()

    We’re going to be adding an endless loop in the next step, so we’ll use the except KeyboardInterrupt to catch a CTRL-C to stop the program, and then we’ll call consumer.close() in the finally block to make sure we clean up after ourselves.

  3. To add the poll loop, add the following while loop between the try: and except.

            while True:
                event = consumer.poll(1.0)
                if event is None:
                    continue
                if event.error():
                    raise KafkaException(event.error())
                else:
                    val = event.value().decode('utf8')
                    partition = event.partition()
                    print(f'Received: {val} from partition {partition}    ')
                    # consumer.commit(event)

    In our loop, we call consumer.poll() repeatedly and then print the value of the event that is received, along with the partition that it came from. Notice that the consumer.commit() call is commented for now.

Run the Consumer

  1. Execute the program by running the following command

    python consumer.py

    If you ran the producer exercise earlier, you should see output like this:

    Assigned to hello_topic, partition 0
    Assigned to hello_topic, partition 1
    Assigned to hello_topic, partition 2
    Assigned to hello_topic, partition 3
    Assigned to hello_topic, partition 4
    Assigned to hello_topic, partition 5
    Received: Hello Fred! from partition 2
    Received: Hello Cindy! from partition 0
    Received: Hello Amy! from partition 1
    Received: Hello Elaine! from partition 5
    Received: Hello Derrick! from partition 4
    Received: Hello Brenda! from partition 3

    The first six lines are the partition assignments, which are all done in order. The next six are the events that are in our hello_topic Let's run the consumer again and observe the result.

  2. To stop the consumer, press Ctrl+C.

  3. Run the consumer again:

    python consumer.py

    Notice we get the same result. This is because we have enable.auto.commit set to False, auto.offset.reset set to earliest, and we are not committing our offsets.

  4. To stop the consumer, press Ctrl+C.

  5. Uncomment the consumer.commit(event) call.

  6. Run the consumer, stop it by pressing Ctrl+C, and run the consumer again.

    You should still see the same result from the first run but notice subsequent run(s) just show the assignment lines and wait for more events.

Produce New Events

Let's leave the consumer running and confirm we see the new events when they are produced to hello-topic.

Note: This section depends upon the producer that was created in the Use Producer to Send Events to Kafka exercise.

  1. Open a new terminal and run the producer:

    source kafka-env/bin/activate && \
    python producer.py

    Return to the terminal the consumer is running in where you will see the newly produced events appear.

Observe Consumer Group Rebalance Behavior

Let's see what happens when we add a consumer instance to the 'hello_group' consumer group.

  1. Open a new terminal and run the consumer:

    source kafka-env/bin/activate && \
    python consumer.py

    Notice how you only see three lines of partition assignments. If you go back to the terminal window that already had the consumer running, you will see the other three partition assignments. Starting the second instance of our consumer caused it to be added to the consumer group with the existing instance, triggering a rebalance. If you launch a third instance, they will each have two partition assignments.

Now let's see how the Kafka Consumer Group Protocol handles a failed consumer instance.

  1. Stop the second consumer instance.

  2. Return to the terminal window where the first consumer instance is running.

    Notice that a fresh set of partition assignments appear for all six partitions. When the other consumer instance was stopped it removed itself from the group triggering a consumer group rebalance. This rebalance would have also occurred if the consumer instance had failed. For more details about consumer groups and rebalances, check out the Consumer module of the Kafka 101 course, or the Consumer Group Protocol module in the Kafka Internals course on Confluent Developer.

Use the promo code PYTHONKAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.