Co-Founder, Confluent (Presenter)
Earlier we learned about Kafka’s strong storage and ordering guarantees on the server side. But when multiple events are involved in a larger process and a client fails in the middle of that process, we can still end up in an inconsistent state. In this module we’ll take a look at how Kafka transactions provide the exactly-once semantics (EOS) which form the basis for the transactional functionality that will solve this problem.
Databases solve this potential problem with transactions. Multiple statements can be executed against the database, and if they are in a transaction, they will either all succeed or all be rolled back.
Event streaming systems have similar transactional requirements. If a client application writes to multiple topics, or if multiple events are related, we may want them to all be written successfully or none.
An example of this might be processing a funds transfer from one account to another and maintaining the current account balances in a stream processor. Offsets will be committed back to Kafka for the topic partitions that feed the topology, there will be state in a state store to represent the current balances, and the updated account balances will be output as events into another Kafka topic. For accurate processing, all of these must succeed together, or not at all.
With transactions we can treat the entire consume-transform-produce process topology as a single atomic transaction, which is only committed if all the steps in the topology succeed. If there is a failure at any point in the topology, the entire transaction is aborted. This will prevent duplicate records or more serious corruption of our data.
To take advantage of transactions in a Kafka Streams application, we just need to set processing.guarantee=exactly_once_v2
in StreamsConfig
. Then to ensure any downstream consumers only see committed data, set isolation.level=read_committed
in the consumer configuration.
To better understand the purpose and value of transactions, let’s take a look at an example of how a system without transactions might handle a failure.
In our example, a funds transfer event lands in the transfers
topic. This event is fetched by a consumer and leads to a producer producing a debit event to the balances
topic for customer A (Alice).
Now if the funds transfer application unexpectedly fails at this point, a new instance is started and takes up where it left off based on the last committed offset.
The end result is a duplicate debit event and an unhappy Alice.
Now let’s see how this would be handled with transactions. First off, let’s discuss two new pieces of the puzzle, the transactional.id
and the transaction coordinator. The transactional.id
is set at the producer level and allows a transactional producer to be identified across application restarts. The transaction coordinator is a broker process that will keep track of the transaction metadata and oversee the whole transaction process.
The transaction coordinator is chosen in a similar fashion to the consumer group coordinator, but instead of a hash of the group.id
, we take a hash of the transactional.id
and use that to determine the partition of the __transaction_state
topic. The broker that hosts the leader replica of that partition is the transaction coordinator.
With that in mind, there are a few more steps involved:
transactional.id
to the coordinator that it maps to a PID and transaction epoch and returns them to the application.transfers
topic and notifies the coordinator that a new transaction has been started.balances
topic, it notifies the coordinator of the topic and partition that it is about to write to. (We’ll see how this is used later.)balances
topic.Now if the application fails and a new instance is started, the following steps will take place:
isolation.level
set to read_committed
will disregard any aborted events. This, effectively, eliminates the chance of duplicated or corrupted data flowing downstream.In a transaction where we successfully go through each of the steps described above, the transaction coordinator will add a commit marker to the internal __transaction_state
topic and each of the topic partitions involved in the transaction, including the __consumer_offsets
topic. This will inform downstream consumers, who are set to read_committed
that this data is consumable. It’s truly a beautiful thing!
When a consumer with isolation.level
set to read_committed
fetches data from the broker, it will receive events in offset order as usual, but it will only receive those events with an offset lower than the last stable offset (LSO). The LSO represents the lowest offset of any open pending transactions. This means that only events from transactions that are either committed or aborted will be returned.
The fetch response will also include metadata to show the consumer which events have been aborted so that the consumer can discard them.
Producer idempotency, which we talked about earlier, is critical to the success of transactions, so when transactions are enabled, idempotence is also enabled automatically.
One thing to consider, specifically in Kafka Streams applications, is how to set the commit.interval.ms
configuration. This will determine how frequently to commit, and hence the size of our transactions. There is a bit of overhead for each transaction so many smaller transactions could cause performance issues. However, long-running transactions will delay the availability of output, resulting in increased latency. Different applications will have different needs, so this should be considered and adjusted accordingly.
Kafka’s transaction support is only for data within Kafka. There is no support for transactions that include external systems. The recommended way to work with a transactional process that includes an external system is to write the output of the Kafka transaction to a topic and then rely on idempotence as you propagate that data to the external system.
Disagree? If you believe that any of these rules do not necessarily support our goal of serving the Apache Kafka community, feel free to reach out to your direct community contact in the group or community@confluent.io
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.