Staff Software Practice Lead
Producing messages to a Kafka topic isn't much use unless you can consume them later. This requires a Kafka Consumer to be implemented. They are responsible for handling the messages, but they don't control the format. Instead, they will need to read the data according to whatever schema it was published with. They will take that incoming data and deserialize it to a C# class or struct where it can be further processed. In this video, we'll see how to create and configure a consumer, as well as how to create a simple message processing loop.
Topics:
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Wade from Confluent. Let's take a look at how we can consume messages from a Kafka Topic in a .NET application. Once we've published our messages to Kafka, we need to be able to get them out again. We do this by creating a Kafka Consumer. The job of the consumer is to process each message in a topic. Usually, they're processed one at a time in the order they're received but the consumer has very little control over the messages themselves. It can control how it processes them but not what it receives. You can think about this like traditional publishing. If I publish a book, the reader has very little control over what goes into the book. However, that doesn't mean the reader has no control. The reader gets to decide which books they read and how they read them. Some people might read a book end-to-end while others might jump around, only looking at the sections that interest them. In the same way, a Kafka consumer can decide what topics they will pay attention to. Within each topic, they often look at every message, perform some processing, and potentially update a database or some other state. However, they may decide that some messages aren't relevant and discard them rather than wasting time on processing. And even within an individual message, they can choose what data in the message is important and ignore the rest. However, consuming all of the messages in a single consumer isn't very scalable. Remember that when messages are produced to a topic, they're assigned a key. That key is used to separate the topic into multiple partitions. Within each partition, the order of the messages is guaranteed. However, it's not guaranteed across multiple partitions. When we create a consumer, we assign it a group ID. This ID indicates that multiple instances of the consumer all belong to the same group. Kafka will distribute the partitions among the group members. This means that in a topic with 30 partitions, you could have up to 30 instances of your consumer all working in parallel. The first step to creating a consumer is to create the configuration. Some of the configurations, such as the BootstrapServers and Authentication Methods is identical to the producer. However, there are some unique configuration values that are worth paying attention to. The GroupId is used to identify multiple consumers that all belong to a single group. This is commonly used to identify multiple instances of the same microservice. It allows the system to parallelize consumption by distributing topic partitions among the members of the group. AutoOffsetReset determines where the consumer should start processing messages if it has no record of a previously committed offset. In other words, if the consumer is unsure where to start processing messages, it will use this setting. Setting it to Earliest will cause the consumer to start from the earliest message it hasn't seen yet. This is a good value to use if you want to ensure that all messages get processed. Setting it to Latest will cause the consumer to start processing from the latest message. This means that if any messages were published before the consumer connected then those messages would be ignored. You would use Latest if you only care about messages that have been published after the consumer has started. Keep in mind that once the consumer starts committing offsets, it will use those offsets to determine the position in the topic rather than using the AutoOffsetReset setting. EnableAutoCommit determines whether or not the consumer will automatically commit its offsets. Committing an offset indicates that the consumer has finished processing a message and doesn't want to receive it again. If we choose to auto-commit, then the offsets are automatically managed by the consumer. We'll go into more detail on committing offsets later in the course. Like with the producer, these settings can be loaded from a configuration file using the Configure Method in the Dependency Injection Framework. Once we have our configuration, the next step is to create our consumer. Again, like with our producer, we create the consumer using a builder. The consumer requires two types, one for the Key and one for the Value of the message. If those types represent complex objects, then we'll need to specify an appropriate deserializer. If you're using an asynchronous deserializer, you'll need to wrap it with a synchronous one using the AsSyncOverAsync Method. When we are ready to begin consuming messages, we need to subscribe to a topic. This is done using the Subscribe Method on the consumer. It takes either a single topic name or a collection of multiple topic names. This informs the client which topics we want to consume but it doesn't actually start processing messages. To start processing messages, we use the Consume Method. It will grab the next available message from the topic and return it as the result. This method takes either a CancellationToken or a timeout. The Consume Method only returns a single message. As a result, it's common to wrap it in a while loop using a CancellationToken to terminate the loop if necessary. Once we've received something from the topic, we can extract the full Message from the result, including both the Key and the Value. Finally, once we have finished consuming all of our messages and don't intend to consume more, we should make sure to Close our consumer. This will terminate any connections it has to the Kafka cluster. If auto-commits are being used, it will also commit any remaining offsets. The Close Method would typically be executed outside of the while loop that we use to consume the messages. This gives us enough of the basics that we should be able to start consuming messages from our Kafka topic in a .NET application. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course and its hands-on exercises.