Senior Developer Advocate (Presenter)
Continuing our dive into Kafka internals, in this exercise, we’ll explore transactions! Throughout the exercise, we’ll use a Java producer client to produce transactional events to a Kafka topic in Confluent Cloud.
Just like the previous exercises in this course, there are some steps you have to go through to prepare your environment for this exercise.
Complete the following steps to set up the environment used for this exercise.
If you don’t already have a Confluent Cloud account, you can create one and receive $400 of free Confluent Cloud usage (this will cover the time to actively finish the exercises in the course, but make sure to delete your cluster if you are finished with it or aren't using it for a day or so).
Let’s start with creating the cluster that will be used during the exercise:
NOTE: If the Get started with Confluent Cloud tutorial appears in the right side of the console, click Leave tutorial.
You also need to create a client configuration file for the source cluster that will be needed by the transactional producer as well as kafka-console-consumer during the exercise.
Since we are using a Java consumer, we need to click the corresponding Java client type.
Here we see the client connection configs which we can copy using the provided button. As you can see, one of the parameters needs to be updated so that it includes a valid cluster API key and secret value. We can create these using the option that is conveniently available on this page.
As you can now see, clicking this option opens a new window containing the new API key and secret.
After creating the API key and secret, notice the client configuration file has been updated to include these values. We can now copy this config file and use it to create it on our Java client machine.
We will now create the local client configuration file containing the Confluent Cloud connection settings.
The sample client configuration file includes properties that are needed if the client is using Schema Registry as well as a couple of other properties that are needed by earlier versions of the Java client.
We won’t be using these properties so we will delete them.
Next, we will clone the GitHub repository that contains the Java project for our transactional producer.
Run command:
git clone https://github.com/confluentinc/learn-kafka-courses.git
We will use kafka-console-consumer.sh and the confluent CLI during the exercise so these need to be installed on the machine you plan to run the exercise on. Downloading Confluent Platform Community components includes both of these.
In a browser, navigate to the following webpage: https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html
Once on the webpage, scroll to the section that covers downloading Confluent Platform components and complete the steps to do so.
Run the following command to add the Confluent Platform bin directory to your path:
echo 'export PATH=$PATH:/home/training/confluent-<version>/bin/' >> ~/.bashrc
To update the Confluent CLI to its latest version, run the following command and complete the prompts that appear:
confluent update --major
This concludes the exercise setup steps. You should be able to proceed with the Transactions exercise steps at this time.
With that housekeeping in order, we can dive in.
As we move forward and produce events, we’ll use a console consumer and the Confluent Cloud console in order to identify when the transactional events become available to consumers.
We’ll configure the console consumer to only process transactional events that are committed and ignore those that are aborted.
In the Confluent Cloud console, we will see both committed and aborted events.
Let’s begin by verifying that the Java client configuration file that will be used by the transactional producer exists.
In a terminal window, run command:
cat ~/java.config
Notice that the bootstrap server's property points to the cluster endpoint. We’ll need this value as we set up a console consumer in a later step, so you may want to keep it handy. Also notice that the cluster API key and secret are included in the properties that will be used to authenticate the client connection. If these aren’t set up properly, you may want to revisit the environment setup instructions in the write up below.
Next, let's do a quick walk-through of our TransactionalProducer.java client and examine the steps it will be taking to produce the transactional events.
Run commands:
cd ~/learn-kafka-courses/transactional-producer
code .
When VSCode opens, navigate to Transactional-Producer/src/main/java/clients.
To open the client in the editor pane, TransactionalProducer.java.
Scroll down so that line 43 is at the top of the pane.
The first thing we see is the loadConfig method on line 49 that loads the configuration file containing the Confluent Cloud client connection configs. To point to the configuration file we’ve already created, we’ll provide the name of the file as an argument when we run the producer.
Next, we see the createTopic method on line 61 that will use the admin client to create the topic that we’ll send the transactional events to if it doesn't already exist.
The topic name we are using for this exercise is transaction-topic.
Looking further into the code, you’ll see the loadConfig method is called on line 86 and the createTopic method is called on line 89. Starting on line 92, additional configuration properties are added to those already loaded from the java.config file. An interesting one to make note of is the transaction.timeout property on line 96. The default value for this property is 60 seconds, but we’ve increased this to 120 seconds to allow more leeway in how we run the exercise. When a transaction begins, the transaction coordinator will wait the time specified by this parameter for additional input from the producer. If this timer expires prior to input being received, the coordinator bumps the epoch for the associated transactional.id. It will also write abort control messages to topic partitions, saying that events have been sent up to that point as part of the current transaction. Bumping the epoch effectively fences off the current instance of the transactional producer. If it sends an additional request to the transaction coordinator, the coordinator will see that its epoch is less than the current epoch and will respond with an exception back to the producer.
Now let's look at the code that actually produces transactional events. On line 100, the first thing that happens is that the producer initializes the transactional.id with the transaction coordinator. If this was a typical consume-process-produce application, we would next see the application poll for records from an input topic.
Next, we see on line 103 that the producer begins a transaction. The transaction includes a for loop that sends events with an incremented key ranging from 0 to 4 and a random number assigned to the value. After each event is sent, the for loop waits for terminal input before continuing to send the next event. After 5 events have been sent, the producer sends a request to the transaction coordinator to commit the transaction and then closes. A catch is included to print any exceptions that occur including the ProducerFencedException.
Let's now run the producer and observe the behavior.
Close VS Code.
In the terminal window, run commands:
clear
./gradlew run --args="/home/training/java.config t01"
Each press of the enter key sends another event.
Let’s take a quick look at the transaction-topic in the Confluent Cloud console to see if the events appear. 11. Restore the Confluent Cloud console, and if the transactions cluster hasn’t already been navigated to, do so now.
Navigate to the topic and use its messages tab to view events that have been written to it starting with offset 0.
We see that several messages were written to the topic.
Let’s run a kafka-console-consumer with isolation.level read_committed and see if it receives these same events in response to sending a fetch request.
First, let’s send one more event to prevent the transaction from timing out.
Now, let’s run the kafka-console-consumer in the lower terminal window.
In a second terminal window, run command:
kafka-console-consumer \
--bootstrap-server <transactions cluster endpoint> \
--consumer.config /home/training/java.config \
--consumer-property "isolation.level=read_committed" \
--topic transaction-topic \
--from-beginning
The console consumer is now polling for records from the transaction topic starting from the earliest available offset due to the --from-beginning parameter. But, as you can see here, we aren’t seeing any records. We know the records were successfully written to the topic since we saw them when we examined the topic in the Confluent Cloud console. The reason that we aren’t seeing them in the console consumer is because of the isolation level. Read committed means that the console consumer will only process the transactional events after associated commit control messages have been written to the corresponding topic partitions.
Let’s now send additional events to reach a count of 5 and observe what happens when the transaction is committed.
Notice as soon as the commit occurs, the events appear in the console consumer terminal window. This illustrates the previous point made regarding the wait for the commit control message.
You might be wondering why the transactional events appeared in the Confluent Cloud console as soon as they were produced. This is because its underlying consumer is configured with an isolation.level of read_uncommitted.
Let's see what happens when a transaction timeout occurs.
We’ll run the producer again ... send a couple of events ... and then wait long enough to cause a timeout before sending additional events.
In the first terminal window, run commands:
clear
./gradlew run --args="/home/training/java.config t01"
In this same terminal window, press the Enter key twice to send two events.
Alright … we sent three events as part of a new transaction. Let’s now wait until the execution timer exceeds 2m before continuing with the next step.
At this point the transaction timeout should have occurred, and the coordinator should have incremented the epoch for our transactional.id. Now let’s send additional events as part of this transaction and observe the result.
As you can see, the transaction timed out and the producer received the expected exception.
Looking at the console consumer window, we can see events sent as part of the aborted transaction do not appear. This might seem problematic at first, but this is totally expected. The consumer application never processes these aborted records when the isolation.level is set to read_committed.
To summarize, during this exercise, we produced both committed and aborted transactional events. We then confirmed that a consumer configured with a isolation.level of read_committed received committed transactional events from the subscribed topic and also that the same consumer does not receive those that were aborted.
If you run this exercise yourself, you’ll need to tear down the exercise environment by deleting the transaction cluster and preventing it from unnecessarily accruing cost and exhausting your promotional credit. Here are the steps for doing so.
First list the clusters in your Confluent Cloud environment.
List the clusters in the environment and their IDs:
confluent kafka cluster list
For this exercise environment, there is only the transactions cluster. So let’s go ahead and delete it using the cluster ID.
Delete the transactions cluster:
confluent kafka cluster delete <transactions cluster ID>
And finally, as a sanity check, we’ll list the environment clusters again and confirm the transactions cluster no longer exists.
Confirm the cluster no longer exists in the environment:
confluent kafka cluster list
No clusters are listed so the environment tear down is complete.
As you learned in this exercise, transactions are an incredibly useful construct, especially for applications that follow a read-process-write structure. Gaining experience with the transactional API as we did today will certainly prove useful as you continue on your Kafka journey!
See you in the next exercise as we continue to delve deeper into Kafka internals.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Continuing our dive into Kafka Internals, in this exercise we'll be exploring transactions. Throughout the exercise, we'll use a Java producer client to produce transactional events to a Kafka topic in a Confluent Cloud cluster. Just like the previous exercise, there are a couple prerequisites that you need to go through to make sure your environment is properly set up. The instructions on how to do so are in the writeup just beneath the video, so take a look at those before we dive in. Let's get started. (upbeat music) As we move forward and produce events, we'll use a console consumer and the Confluent Cloud Console in order to identify when the transactional events become available to consumers. We'll configure the console consumer to only process transactional events that are committed and ignore those that are aborted. In the Confluent Cloud Console, we'll see both committed and aborted events. Let's begin by verifying that the Java client configuration file that will be used by the transactional producer exists. Notice that the bootstrap service property points to the cluster endpoint. We'll need this value as we set up a console consumer in a later step, so you might wanna keep it handy. Also notice that the cluster API key and secret are included in the properties that will be used to authenticate the client connection. If these aren't set up properly, you may want to revisit the environment set up instructions in the writeup below. Next, let's do a quick walkthrough of our producer client and examine the steps it will be taking to produce the transactional events. The first thing we see is the loadConfig method on line 49 that loads the configuration file containing the Confluent Cloud client connection configs. To point to the configuration file we've already created, we'll provide the name of the file as an argument when we run the producer. Next, we see the createTopic method on line 61. This will use the admin client to create the topic that we'll send the transactional events to if it doesn't already exist. The topic name we're using for this exercise is transaction-topic. Looking further into the code, you'll see the loadConfig method is called on line 86, and the createTopic method is called on line 89. Starting on line 92, additional configuration properties are added to those already loaded from the Java config file. An interesting one to make note of is the transaction_timeout property on line 96. The default value for this property is 60 seconds, but we've increased this to 120 seconds to allow more leeway in how we run the exercise. When a transaction begins, the transaction coordinator will wait the time specified by this parameter for additional input from the producer. If the timer expires prior to input being received, the coordinator bumps the epic for the associated transactional ID. It will also write abort control messages to topic partitions that events have been sent up to that point as part of the current transaction. Bumping the epic effectively fences off the current instance of the transactional producer. If it sends an additional request to the transaction coordinator, the coordinator will see that its epic is less than the current epic and will respond with an exception back to the producer. Now let's look at the code that actually produces transactional events. On line 100, the first thing that happens is that the producer initializes the transactional ID with the transaction coordinator. If this was a typical consume-process-produce application, we would next see the application pull for records from an input topic. Next we see on line 103 that the producer begins a transaction. The transaction includes a for loop that sends events with an incremented key ranging from zero to four and a random number assigned to the value. After each event is sent, the for loop waits for terminal input before continuing to send the next event. After five events have been sent, the producer sends a request to the transaction coordinator to commit the transaction and then closes. A catch is concluded to print any exceptions that occur, including the producer fenced exception. Let's now run the producer and observe the behavior. Each press of the Enter key sends another event. Let's take a quick look at the transaction topic in the Confluent Cloud Console to see if events appear. Navigate to the topic and use its Messages tab to view events that have been written to it starting with offset zero. We see that three messages were written to the topic. Let's run a kafka-console-consumer with isolation level read committed and see if it receives these same events in response to sending a fetch request. First, let's send one more event to prevent the transaction from timing out. Now let's run the kafka-console-consumer in the lower terminal window. The console consumer is now pulling for records from the transaction topic, starting from the earliest available offset due to the from-beginning parameter. But as you can see here, we aren't seeing any records. We know the records were successfully written to the topic since we saw them when we examined the topic in the Confluent Cloud Console. The reason that we aren't seeing them in the console consumer is because of the isolation level. Read committed means that the console consumer will only assess the transactional events after associated commit control messages have been written to the corresponding topic partitions. Let's now send additional events to reach a count of five and observe what happens when the transaction is committed. Notice as soon as the commit occurs the events appear in the console consumer terminal window. This illustrates the previous point made regarding the wait for the commit control message. You might be wondering why the transactional events appeared in the Confluent Cloud Console as soon as they were produced. This is because its underlying consumer is configured with an isolation level of read uncommitted. Let's see what happens when a transaction timeout occurs. We'll run the producer again, send a couple of events, and then wait long enough to cause a timeout before sending additional events. All right, we sent three events as part of the new transaction. Again, with some movie magic, let's skip forward to the point where about two minutes have elapsed since the last event was sent. At this point, the transaction timeout should have occurred and the coordinator should have incremented the epic for a transactional ID. Now let's send additional events as part of this transaction and observe the result. As you can see, the transaction timed out and the producer received the expected exception. Looking at the console consumer window, we can see events sent as part of the aborted transaction do not appear. This might seem problematic at first, but this is totally expected. The consumer application never processes these aborted records when the isolation level is set to read committed. To summarize, during this exercise we produced both committed and aborted transactional events. We then confirmed that a consumer configured with an isolation level of read committed received only the committed transactional events for the subscribed topic, and also that the same consumer did not receive those transactional events that were aborted. If you are following along and running this exercise yourself, you'll need to tear down the exercise environment by deleting the transaction cluster and preventing it from unnecessarily accruing cost and exhausting your promotional credit. Let's walk through that tear-down process together. We'll first list the clusters in our Confluent Cloud environment. For this exercise environment, there is only the transactions cluster, so let's go ahead and delete it using the cluster ID. And finally, as a sanity check, we'll list the environment clusters again and confirm the transactions cluster no longer exists. No clusters are listed, so the environment tear down is complete. As you learned in this exercise, transactions are an incredibly useful construct, especially for applications that follow a read-process-write structure. Gaining experience with the transactional API, as we did today, will certainly prove useful as you continue on your Kafka journey. See you in the next exercise where we'll cover more Kafka Internals.