Senior Developer Advocate
In this lecture, you will learn how to send messages to Kafka topics using the Python Producer class. Follow along as Dave Klein (Senior Developer Advocate, Confluent) covers all of this in detail.
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-producer https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/producer.py
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, Dave Klein here again as we continue the Apache Kafka for Python Developers course. In this module, we'll learn how to produce events to Kafka with Python. Let's get started. The producer does most of the heavy lifting in the client library. It's what we use to write events to Kafka topics. And as we think about that process, it's important to remember that topics are made up of one or more partitions, sometimes many partitions. We'll soon see where this comes into play with the producer. As we mentioned, sending events to Kafka is the main purpose of the producer but there's more to that task than it might seem on the surface. Let's drill down into a few of the many responsibilities of this hardworking class. Determining to which partition an event should be written is important for a number of reasons including ordering guarantees and avoiding overloaded partitions. The partition for an event can be specified at the application level but if it's not, then the producer will make that determination based on a configurable algorithm. We can set the algorithm used by setting the partitioner property. The default partitioner will ensure that events with the same key will go to the same partition and events without keys will be assigned randomly. This works well for most use cases. When a producer is being asked to send a steady stream of events, it can group them into batches. This can increase producer throughput by reducing network calls and by providing for more efficient compression. Speaking of compression, this is another responsibility of the producer. Depending on the value of the compression type configuration property, the producer can compress data before sending it to the broker. This can also increase throughput as well as saving space on disk. We'll see how to turn compression on shortly. Producers will by default retry send requests that fail due to transient errors. In just a bit, we'll look at some configuration settings that we can use to affect this behavior. Response or on delivery callbacks are optional but if they are used, the producer is the one responsible for calling them. And last but not least, for our partial list, producers are the key player in Kafka transactions which let developers achieve strong event processing guarantees like exactly once semantics and all or none processing of multiple events, both of which are critical for some use cases. For example, a debit event and a corresponding credit event can be guaranteed to be processed exactly once atomically as a pair using a transactional API. Now that we have a better understanding of just how important a role the producer plays in building Kafka applications, let's take a look at how to construct and configure a producer instance. The producer constructor just needs a dictionary of configuration details. At the very least, the configuration dictionary must contain the bootstrap.servers property which gives the producer a Kafka broker to connect to. This dictionary also contains any security information needed to connect to that broker. And then there is a rather large list of optional configuration properties. We'll cover a few of the more common ones here. One of the most important configurations for a Kafka producer is acks or acknowledgements. This setting determines the durability of your data. With acks set to zero, producer requests will be fire and forget. There'll be no guarantee that the data was successfully stored on the broker. This is the fastest setting but not very safe. Setting acks to one will ensure that the lead broker has received the data and stored it to disk but there's no guarantee that the data has been replicated. The safest and the default setting is all or negative one. With this setting, we can guarantee that the data has been written to the lead broker and all of the follower replicas in the ISR or in-sync replica set. For more information on Kafka's replication, refer back to the Kafka 101 course on Confluent Developer. Batch size is the target size for a single produce request. By increasing this setting, we can reduce the number of network calls by combining several events into one request. This can dramatically increase our throughput but can also increase latency. There are always trade-offs and you'll need to do some testing to determine the right balance for your use case. Linger.ms determines how long a produce request will wait for the batch to reach the target batch size. With the default of zero, a large batch size won't have much effect. So for best results, we need to adjust these two properties in tandem. Compression can reduce the number of bytes sent in the produce request which can improve both throughput and latency. We can set the compression type to gzip, snappy, lz4, or zstd. Experimentation is usually in order when using this setting. By default, a producer will retry potentially transient failures. The retry configuration setting determines how many times it will retry before giving up. The default setting is pretty huge but that's because the recommended way of limiting the number of retries is setting the delivery timeout.ms configuration. This configuration will limit the total time of a produce request including retries. If a request has not been successful by the time this limit is reached, it will fail for good. The transactional ID is used to ensure the integrity of transactions across multiple producer sessions. It's also required for a producer to participate in a transaction at all. There are many more configuration options for producers but the last one we'll talk about here is enable.idempotence. When set to true, which is the default, the producer will ensure that exactly one copy of each message is written to the stream. It does so by adding a sequence number to each produced message. This sequence number combined with the producer ID uniquely identifies the message allowing the broker to recognize if it receives a duplicate message from the producer. If this happens, the broker responds with the duplicate message exception to the producer which allows the producer to know the message has been successfully written to the topic. Now let's look at some of the key methods of the producer class. The workhorse method of the producer class is produce. This is what we use to send events to topics. The first argument is required and that is the topic name. We can also pass the value, key, partition, a callback function, timestamp and headers. We'll most often let the producer determine the correct partition and the current time which is the default for timestamp is usually what we are looking for. So a common call to produce might look something like this. Here we are skipping the partition parameters so we're using named arguments for the on delivery and headers. Since we are not specifying a partition but we are specifying a key, the producer will use that key to determine the appropriate partition. It will also perform compression if we've configured it and if there's a temporary network hiccup, it will retry the producer request for us. The produce method is asynchronous so to make sure that all the current produced requests and their corresponding callbacks are complete, we need to call the flush method. While produce and flush are probably the two producer methods that we'll use the most, there are other methods on this class that can be very useful. Let's get a brief overview of the producer methods that are used in Kafka transactions. We'll touch on some of the key producer methods that are used in Kafka transactions but we should note that full coverage of the use of these methods is out of scope for this course. The goal of this overview is to set you in the right direction as you continue to explore this powerful API. The first method we'll look at and the one that needs to be called before any transactions are begun is init_transactions . This will prepare this producer instance to participate in one or more transactions. Then to start a new transaction, we use the begin_transaction method which takes no arguments. After beginning our transaction, we can make whatever produce calls that we need to make to whichever topics we need to reach and it will either all be successful or it will all be aborted. If it's all successful, then we will call producer.commit_transaction . This will flush any inflight requests and then mark the entire transaction as successful. In the event that something goes wrong somewhere in this process, we need to call producer.abort_transaction . This will purge any inflight requests and mark the transaction as aborted. We all hate it when this happens but sometimes it's just the right thing to do. The final transaction related method we'll look at is send_offsets_to_transactions . This is used when transactions include both producing and consuming events. Consumers, as we'll see in a later module, operate in consumer groups and this method allows us to communicate with consumer groups to keep producers and consumers involved in the transaction in sync. That's all we'll say about transactions in this course but there's an excellent example of transactions in action in the repo shown at the bottom of the screen. And now let's head to the next module where we'll get some hands on experience with the Python producer class. If you are not already on Confluent Developer, head there now using the link in the video description to access the rest of this course and it's hands on exercises.