Get Started Free
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

Hands On: Consumer Group Protocol

Welcome back to another Kafka internals exercise! In this exercise, we’ll explore consumer groups.

Consumer groups are a vital aspect of Kafka that impacts how work can be parallelized over many application instances. To get a better feel for this, we’re going to dive a little deeper into the consumer group protocol and see how it controls the behavior of a Kafka consumer group as consumers join and leave the group.

As in the last exercise, there are some prerequisite environment setup steps that you need to go through before you can start.

Exercise Setup

Complete the following steps to set up the environment used for this exercise.

If you don’t already have a Confluent Cloud account, you can create one and receive $400 of free Confluent Cloud usage (this will cover the time to actively finish the exercises in the course, but make sure to delete your cluster if you are finished with it or aren't using it for a day or so).

Let’s start with creating the cluster that will be used during the exercise:

  1. Open URL https://confluent.cloud and log in to the Confluent Cloud console.

  2. Navigate to the default environment.

    NOTE: If the Get started with Confluent Cloud tutorial appears in the right side of the console, click Leave tutorial.

  3. Create a basic cluster named cgp-exercise.

    NOTE: If multiple people within your organization will be going through this exercise, append a unique string, e.g., <last name>, to the cluster name to prevent your exercise from conflicting with others.

Create the topics used by the exercise:

  1. Create test-1 topic:

    a. Click cgp-exercise cluster.

    b. Click Topics.

    c. Click Create topic.

    d. Enter test-1 in the Topic name field.

    e. Enter 2 in the Number of partitions field.

    d. Click Create with defaults.

  2. Create test-2 topic:

    a. Click Topics.

    b. Click + Add topic.

    c. Enter test-2 in the Topic name field.

    d. Enter 2 in the Number of partitions field.

    e. Click Create with defaults.

You also need to create a client configuration file that is used by kafka-console-consumer.sh and kafka-consumer-groups.sh during the exercise:

  1. To navigate to the Java client configuration page, click Cluster overview and then click Configure a client.

Since we are using a Java consumer, we need to click the corresponding Java client type.

  1. Click Java.

Here we see the client connection configs which we can copy using the provided button. As you can see, one of the parameters needs to be updated so that it includes a valid cluster API key and secret value. We can create these using the option that is conveniently available on this page.

  1. Click Create Kafka cluster API key.

As you can now see, clicking this option opens a new window containing the new API key and secret.

  1. Assign a name of cgp-exercise cluster.
  2. Click the Continue button.

After creating the API key and secret, notice the client configuration file has been updated to include these values. We can now copy this configuration and use it to create a local configuration file on our Java client machine.

  1. Click the Copy button.

Minimize the Confluent Cloud console so that the Ubuntu desktop is visible.

Let's now create the local configuration file. When we run the kafka-console-consumer command, we will point to this configuration file with the --consumer.config parameter.

  1. Open a terminal window.
  2. Run command nano java.config.

We will start with the client configuration file that we copied from the Confluent Cloud console.

  1. Right-click the nano edit window and click Paste.

The sample client configuration file includes properties that are needed if the client is using Schema Registry as well as a couple of other properties that are needed by earlier versions of the Java client.

Our Java client doesn’t require these properties so we will delete them.

  1. Delete lines 6 thru 15.
  2. Save and close java.config.

We will use kafka-console-consumer.sh and kafka-consumer-groups.sh and the confluent CLI during the exercise. Downloading Confluent Platform Community components includes all of these.

  1. In a browser, navigate to the following webpage: https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html

  2. Once on the webpage, scroll to the section that covers downloading Confluent Platform components and complete the steps to do so.

  3. Run the following command to add the Confluent Platform bin directory to your path:

    echo 'export PATH=$PATH:/home/training/confluent-<version>/bin/' >> ~/.bashrc
  4. Run the following command to set the classpath:

    echo 'export CLASSPATH=<path>/confluent-<version>/share/java/monitoring-interceptors/monitoring-interceptors-7.1.1.jar' >> ~/.bashrc
  5. To update the confluent CLI to its latest version, run the following command and complete the prompts that appear:

    confluent update --major

This concludes the exercise setup.

Exercise Steps

In part one of this exercise, we’ll start up three consumer instances and assign them the same group ID. As mentioned in the previous consumer group protocol module, setting a common group ID means that the three separate consumer instances will join the same consumer group. Another thing to note is that we’ll configure the consumer instances to use the range partition assignment protocol—this just defines how the partitions are divvied up among the consumers that join the group.

Once we start the consumer instances, we’ll confirm that they join the consumer group and observe how partitions are actually assigned using the range protocol.

Let’s start by bringing up a terminal and use the Confluent CLI to confirm that the topics exist and are ready to use.

We’ll first log into Confluent Cloud—and note that we’re using the --save flag so that our credentials will be used for subsequent commands in this exercise. They’ll remain active until you run the logout command.

  1. In a terminal window, run command:

    confluent login --save 

    Enter email and password.

Now we can verify the exercise cluster and see if the topics exist.

  1. Run commands:

    confluent kafka cluster list
    confluent kafka topic list --cluster <cgp-cluster ID>

As you see here, the cluster and topics are present. And just a note: If you’re not seeing the cluster and/or the topics, then we recommend you check out the exercise setup instructions before continuing.

Okay, let’s also confirm that the Java client configuration file that we mentioned earlier exists. It should contain the API key and secret that we need to access our cluster.

  1. Run command:

    cat /home/training/java.config

Our file looks good—it contains the API key and secret as well as the bootstrap servers pointing to our cluster. Note that this same value will be specified in the bootstrap server property of commands that we run during this exercise so it might help to keep it handy.

Now we can begin part one of the exercise!

Let’s start up our three consumer instances. We can use the Kafka console consumer tool as our Java consumer client. We will run each consumer instance in a separate terminal. We’ll also give each consumer a different client ID and make sure that they’re all part of the same consumer group. Each consumer instance will also subscribe to both the test 1 and test 2 topics and use the range assignment protocol as we mentioned earlier.

  1. In a new terminal window, run the following command to start the first consumer instance:

    kafka-console-consumer \
        --bootstrap-server <cgp-cluster endpoint> \
        --consumer.config /home/training/java.config \
        --group test-group \
        --whitelist "test-1|test-2" \
        --consumer-property "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" \
        --consumer-property "partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor" \
        --consumer-property client.id=C1
  2. In a second terminal, repeat step 4 using client.id=C2 to start a second consumer instance.

  3. In a third terminal, repeat step 4 using client.id=C3 to start a third consumer instance.

You should now have three consumer instances running, each with a unique client ID assigned.

Let’s use the kafka-consumer-groups command see how the partitions of the test 1 and test 2 topics were assigned to these instances.

  1. Return to the initial terminal (not one that contains a running consumer instance) and run command:

    kafka-consumer-groups \
        --bootstrap-server <cgp-cluster endpoint> \
        --command-config /home/training/java.config \
        --group test-group \
        --describe \
        --members \
        --verbose

It appears that only two of the consumer instances were assigned partitions while the third consumer is idle.

This scaling limitation of the range partition assignment strategy was covered in the consumer group protocol module.

Basically, the limiting factor is the number of partitions for one of the subscribed, co-partitioned topics. In this case, we have two subscribed topics, each with two partitions, so only two of the consumer instances were assigned partitions.

Let’s stop these three instances and move on to section two.

  1. In each of the consumer instance terminal windows, press Ctrl-C to stop the consumer instance.

In this second section of the exercise, we’ll start three new consumer instances, but this time we’ll configure the first two consumers to use either the sticky or cooperative sticky strategies while instance three will only use the sticky strategy.

After these instances are up and running, we’ll see which of these protocols is used by the consumer group and observe how the partitions are assigned using the selected protocol.

Let’s first start up consumers 1 and 2 with the sticky and cooperative sticky assignment strategies.

  1. Start consumer instance 1 by running the following command in one terminal:

    kafka-console-consumer \
        --bootstrap-server <cgp-cluster endpoint> \
        --consumer.config /home/training/java.config \
        --group test-group \
        --whitelist "test-1|test-2" \
        --consumer-property "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" \
        --consumer-property "partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.StickyAssignor" \
        --consumer-property client.id=C1
  2. In a second terminal, repeat step 9 using client.id=C2 to start a consumer instance 2.

Now let’s start up that third consumer with just the sticky assignment strategy.

  1. In a third terminal, run the following command to start consumer instance 3:

    kafka-console-consumer \
        --bootstrap-server <cgp-cluster endpoint> \
        --consumer.config /home/training/java.config \
        --group test-group \
        --whitelist "test-1|test-2" \
        --consumer-property "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" \
        --consumer-property "partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor" \
        --consumer-property client.id=C3

With all three consumers up and running, let’s see what partition assignment strategy is being used by the consumer group.

  1. Return to the initial terminal and run the following command:

    kafka-consumer-groups \
        --bootstrap-server <cgp-cluster endpoint> \
        --command-config /home/training/java.config \
        --group test-group \
        --describe \
        --state

After running the describe command on the consumer group we see that the sticky strategy is being used. This is the result of an election that occurs when the consumer instances join the group.

All of the strategies that are configured for each consumer instance are candidates, and, in this case, the election was easy since only the sticky strategy was configured for all three consumer instances.

The primary situation where this election process is required is when an application is updated to use a different partition assignment strategy. First, the new strategy is added to all consumer instances using a rolling upgrade process. Once this new strategy has been added to all consumer instances, a second rolling upgrade can be done where the prior strategy is removed from each consumer instance configuration.

Let’s check to see how partitions were assigned to the consumer instances by the sticky strategy.

  1. In the initial terminal, run command:

    kafka-consumer-groups \
        --bootstrap-server <cgp-cluster endpoint> \
        --command-config /home/training/java.config \
        --group test-group \
        --describe \
        --members \
        --verbose

Describing the consumer group members shows that one instance was assigned two partitions while the other two instances were assigned a single partition.

This is how the Sticky assignment strategy works. It effectively assigns partitions in a round-robin fashion, resulting in a more even distribution of partitions to all consumer instances in the group.

With this strategy, as well as with the Round Robin and Cooperative Sticky strategies, scaling of the consumer group is limited by the sum of partitions from all subscribed topics.

Let’s now move on to part three of this exercise where we’ll start a fourth instance and confirm that the partition assignment is rebalanced. We’ll start up a fourth consumer using the sticky assignment strategy.

  1. In a new terminal, run command:

    kafka-console-consumer \
        --bootstrap-server <cgp-cluster endpoint> \
        --consumer.config /home/training/java.config \
        --group test-group \
        --whitelist "test-1|test-2" \
        --consumer-property "interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" \
        --consumer-property "partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor" \
        --consumer-property client.id=C4

Recall that when a new instance starts, it will send a request to the group coordinator to join the group. In response, the group coordinator will notify existing members that a group rebalance is in progress. Each member will respond by also requesting to join the group. Partitions are then assigned to the updated group membership, and event consumption continues.

Let’s see how the partitions were assigned to the four group members.

  1. Return to the initial terminal and run command:

    kafka-consumer-groups \
        --bootstrap-server <cgp-cluster endpoint> \
        --command-config /home/training/java.config \
        --group test-group \
        --describe \
        --members \
        --verbose

Describing the consumer group, we see that it was successfully rebalanced and each consumer instance is now assigned a single partition.

During this exercise, we observed the consumer group protocol in action and witnessed a number of pretty cool things. We saw:

  • How consumer group scaling is limited based upon its partition assignment strategy
  • How partition assignment strategy election operates
  • How partitions are assigned by the different available strategies
  • How the consumer group rebalances when a member joins the group

Note

When you’re done running this exercise, you need to tear down the exercise environment by deleting the cluster. This will prevent it from unnecessarily accruing cost and exhausting your promotional credit.

Let’s walk through the tear down process for this exercise environment.

First, list the clusters in the Confluent Cloud environment.

  1. List the clusters in the environment and their IDs:

    confluent kafka cluster list

For this exercise environment, you should only see one cluster. Now we can delete it.

  1. Delete cgp-cluster cluster:

    confluent kafka cluster delete <cgp-cluster ID>

And finally as a sanity check, we’ll confirm that the cluster no longer exists.

  1. Confirm the cluster no longer exists in the environment:

    confluent kafka cluster list

No clusters are listed so the environment tear down is complete.

As mentioned at the beginning of this exercise, the consumer group protocol is a pretty important part of Kafka as a whole, opening up the realm of parallelization and optimized stream processing. Be sure to check out the remaining exercises in this course as we dive deeper into Kafka internals.

Use the promo code INTERNALS101 to get $25 of free Confluent Cloud usage

Disagree? If you believe that any of these rules do not necessarily support our goal of serving the Apache Kafka community, feel free to reach out to your direct community contact in the group or community@confluent.io

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.