Senior Developer Advocate
In this exercise, you will use the Consumer class to read events from a Kafka topic.
We will be using the hello_topic which we created in the Use Producer to Send Events to Kafka exercise. We will be reading these events during this exercise so if you did not complete the previous exercise, you will need to either do so or you can use the Confluent Cloud Console to manually create the topic and send some events to it.
Open a terminal window and navigate to the kafka-python directory that you created in the previous exercise.
If you are not currently using the kafka-env environment that was created in the last exercise, switch to it with the following command:
source kafka-env/bin/activate
Create a file called consumer.py.
Open consumer.py in an IDE of your choice.
Add the following import statements to the top of the consumer.py file:
from confluent_kafka import Consumer, KafkaException
from config import config
Create a function called set_consumer_configs(). We will call this method in the main block in order to add some consumer specific configuration properties.
def set_consumer_configs():
config['group.id'] = 'hello_group'
config['auto.offset.reset'] = 'earliest'
config['enable.auto.commit'] = False
Along with setting the group.id, we are also setting auto.offset.reset to earliest so that we can consume events that are already in our target topic. Finally, we are setting enable.auto.commit to False so that we can control the committing of offsets for our consumer.
Create a function called assignment_callback() that takes a consumer and a key.
def assignment_callback(consumer, partitions):
for p in partitions:
print(f'Assigned to {p.topic}, partition {p.partition}')
This callback will be passed into the consumer.subscribe() call and it will be invoked whenever topic partitions are assigned to this consumer. This includes during the subscribe() call, as well as any subsequent rebalancing.
Add the following beginnings of the main block.
if __name__ == '__main__':
set_consumer_configs()
consumer = Consumer(config)
consumer.subscribe(['hello_topic'], on_assign=assignment_callback)
Here we are updating the config Dictionary, creating our Consumer and then calling the subscribe() method. This method takes a list of topic names. This is often a list of one. We are passing a function for the on_assign callback. We could also pass in functions for on_revoke and on_lost, but these are less commonly used.
Add a try|except|finally block.
try:
except KeyboardInterrupt:
print('Canceled by user.')
finally:
consumer.close()
We’re going to be adding an endless loop in the next step, so we’ll use the except KeyboardInterrupt to catch a CTRL-C to stop the program, and then we’ll call consumer.close() in the finally block to make sure we clean up after ourselves.
To add the poll loop, add the following while loop between the try: and except.
while True:
event = consumer.poll(1.0)
if event is None:
continue
if event.error():
raise KafkaException(event.error())
else:
val = event.value().decode('utf8')
partition = event.partition()
print(f'Received: {val} from partition {partition} ')
# consumer.commit(event)
In our loop, we call consumer.poll() repeatedly and then print the value of the event that is received, along with the partition that it came from. Notice that the consumer.commit() call is commented for now.
Execute the program by running the following command
python consumer.py
If you ran the producer exercise earlier, you should see output like this:
Assigned to hello_topic, partition 0
Assigned to hello_topic, partition 1
Assigned to hello_topic, partition 2
Assigned to hello_topic, partition 3
Assigned to hello_topic, partition 4
Assigned to hello_topic, partition 5
Received: Hello Fred! from partition 2
Received: Hello Cindy! from partition 0
Received: Hello Amy! from partition 1
Received: Hello Elaine! from partition 5
Received: Hello Derrick! from partition 4
Received: Hello Brenda! from partition 3
The first six lines are the partition assignments, which are all done in order. The next six are the events that are in our hello_topic Let's run the consumer again and observe the result.
To stop the consumer, press Ctrl+C.
Run the consumer again:
python consumer.py
Notice we get the same result. This is because we have enable.auto.commit set to False, auto.offset.reset set to earliest, and we are not committing our offsets.
To stop the consumer, press Ctrl+C.
Uncomment the consumer.commit(event) call.
Run the consumer, stop it by pressing Ctrl+C, and run the consumer again.
You should still see the same result from the first run but notice subsequent run(s) just show the assignment lines and wait for more events.
Let's leave the consumer running and confirm we see the new events when they are produced to hello-topic.
Note: This section depends upon the producer that was created in the Use Producer to Send Events to Kafka exercise.
Open a new terminal and run the producer:
source kafka-env/bin/activate && \
python producer.py
Return to the terminal the consumer is running in where you will see the newly produced events appear.
Let's see what happens when we add a consumer instance to the 'hello_group' consumer group.
Open a new terminal and run the consumer:
source kafka-env/bin/activate && \
python consumer.py
Notice how you only see three lines of partition assignments. If you go back to the terminal window that already had the consumer running, you will see the other three partition assignments. Starting the second instance of our consumer caused it to be added to the consumer group with the existing instance, triggering a rebalance. If you launch a third instance, they will each have two partition assignments.
Now let's see how the Kafka Consumer Group Protocol handles a failed consumer instance.
Stop the second consumer instance.
Return to the terminal window where the first consumer instance is running.
Notice that a fresh set of partition assignments appear for all six partitions. When the other consumer instance was stopped it removed itself from the group triggering a consumer group rebalance. This rebalance would have also occurred if the consumer instance had failed. For more details about consumer groups and rebalances, check out the Consumer module of the Kafka 101 course, or the Consumer Group Protocol module in the Kafka Internals course on Confluent Developer.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.