Senior Developer Advocate
In this lecture, you will learn how to read events from Kafka topics using the Python Consumer class. Follow along as Dave Klein (Senior Developer Advocate, Confluent) covers all of this in detail.
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-consumer https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py
Hi, Dave Klein here again with the Apache Kafka for Python Developers course. In this module, we'll learn how to read and process events in our Python applications using the consumer class. Let's get started. Once we have one or more producers sending events to Kafka, we need to get those events back out into other parts of our system. This is where the consumer class comes into play. Before we go further, I want to stress the point that consumers, despite their name, don't actually consume events from the topics. They read them so that we can process them but the events are still in the topics and available to be re-read or to be read by other consumer applications. Kafka topics are immutable and durable. Older events can be removed from topics based on retention settings that we control but they're not removed as they're consumed. The consumer subscribes to one or more topics and then reads events from those topics starting at a determined point and reading from oldest to newest. By including the consumer class in our applications, we can receive a steady stream of events and act on them in real time. The consumer will keep track of the events that it has successfully read and processed by updating the committed offset. This can be done by us in our code or automatically depending on a configuration property. By committing the offsets ourselves, we can ensure that any necessary processing is completed first. A consumer can read the events from all the partitions of a topic or we can run multiple instances of a consumer application and they will participate in a consumer group which will balance the workload among all running instances. We can effectively operate as many consumer application instances as we have partitions in our topic because the events in a single partition cannot be consumed by more than one instance of the same consumer application. This is something to keep in mind when determining the number of partitions in a new topic. Partitions are a key unit of scale in Kafka. While consumers are not directly involved in transactions, by setting the isolation.level, they can be used in conjunction with transactional producers. With isolation.level set to read_committed, a consumer will not read any events that are part of an open or aborted transaction. Now let's get to some code. To start with, we'll look at creating an instance of a consumer. Just like the producer, the consumer constructor takes a dictionary of configuration properties. Also like the producer, the configuration for the consumer will have the bootstrap server, Kafka broker to connect to as well as any required security configurations. Then there are also several consumer specific properties that we can set. Here are a few important examples. group.id is a required property for consumers. It can be any string value but should be unique to this application. The group.id will be used in consumer group coordination as well as other behind the scenes processes. When starting a consumer application for the first time, the auto.offset.reset property will determine whether we begin consuming from the beginning of the topic or only consume new events as they arrive. Once our application is running and offsets are being committed, the committed offsets will be used to determine the starting point. However, if the committed offset is no longer valid due to retention expiration or some other reason, the auto.offset.reset property will come into play again. The default value for this property is latest or only consume new events. Setting this property to earliest will cause consumption to start at the beginning of the topic. Speaking of committed offsets, enable.auto.commit will determine whether we will decide in our application code when to commit offsets or whether the consumer class will do it for us automatically. There are always trade-offs to consider but the most common recommendation is to set this property to false and to commit offsets intentionally in our code. The isolation.level property will determine the behavior of our consumer when reading events that were produced as part of a transaction. If this property is set to read_uncommitted, we will consume all events, even those that are in an incomplete transaction or those that have been aborted. This might make sense if our application is not participating directly in the transaction. For example, an auditing application that wants to see all events regardless of transaction state. But if our application is part of a larger transactional process, isolation.level should be set to read_committed which is the default. Now let's look at how to put the consumer class to use. After setting up the configuration dictionary and constructing a consumer instance, the next thing we need to do is to subscribe to one or more topics. More often than not, we'll only subscribe to one topic but it still needs to be in a list. When subscribing to a topic, one or more partitions in that topic will be assigned to this consumer instance. We can pass in a callback that will be executed when this assignment happens initially as well as any time the assignment changes due to rebalancing. Let's take a look at an example of this callback. The on_assign callback function receives as parameters the consumer on whose behalf it is executing and a list of topic partition instances. Topic partition is a data class that holds the topic name, partition and offset. The on_assign callback can be helpful in generating logs to keep an eye on what's happening with our consumers. This is especially useful when we have a large number of consumers in a group. There are two other callback functions that we can pass to the subscribe method; on_revoke and on_lost. These are more specialized and out of scope for this course but they do receive the same arguments as on_assign. You can learn more about them in the Confluent Kafka Python documentation. Once we have our consumers subscribe to one or more topics, we can begin to receive events from these topics. This is where the poll method comes into play. Poll returns one event at a time and is usually called in a loop. The return value of the poll method can be none or an instance of the message class which we'll look at shortly. The timeout parameter, in this case one second, determines how long to wait for an event to be available before returning none. If an event is returned instead of none, that doesn't necessarily mean that all went well. The event or message object might be letting us know that something went wrong. To find out, we call its error method. The error method will return a Kafka error or none. Now let's take a closer look at the message class to see what is available for us to work with as we process incoming events. Along with the error method that we already discussed, the message class also has methods to return the key, value, headers and timestamp information. These are the most common items needed when processing data in our consumer applications. We can also retrieve the name of the topic that the event came from as well as its partition and offset. You can see how topic would be helpful if a consumer subscribed to more than one. Now let's see how some of these methods might be used in processing events in our polling loop. In a real Kafka consumer application, the processing logic would be a bit more complex than this but this gives us an idea of how we can extract the necessary information from our events and put it to use. In this case, as is common, the key and value are coming back as bytes so we need to decode them. In a later module, we'll talk about how we can serialize and deserialize event data using schema definition frameworks like protobuf and Avro. If we have enable.auto.commit set to false which is recommended in most situations, we will need to commit the offset. This can be done after each event or at some interval that makes sense for our application. One way to do this is to pass the most recently processed event into the commit method. Since the event can provide the topic, partition and offset, we have everything we need to commit the offset. Alternatively, we can pass a list of topic partition objects to the commit method. The consumer also has another means of committing offsets and that is the store_offsets method which can be more efficient than commit but a bit more involved to use. You can find more information on store_offsets in the Confluent Kafka Python documentation. And now let's head to the next module where we'll get some hands on experience with Python consumer class. If you are not already on Confluent Developer, head there now using the link in the video description to access the rest of this course and it's hands-on exercises.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.