This tutorial demonstrates how to produce a high volume of messages to Kafka, and then compare consumption throughput when using both regular consumers and share consumers. The steps in this tutorial outline how to set up a cluster for share consumers, run the provided producer / consumer applications, and compare performance results between classic Kafka consumer instances and share consumers. For a deeper look at the application source code, refer to the Code explanation section at the bottom.
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
First, create a Dedicated 1-CKU cluster in Confluent Cloud by following the instructions here.
Since Queues for Kafka is currently a Closed Preview feature, you'll need to open a support request to enable the feature on your cluster. In the Confluent Support Portal, open a ticket requesting that Queues for Kafka be enabled for your cluster. Provide the cluster ID in your request, which you can find in the Confluent Cloud Console by navigating to Cluster Settings from your Dedicated cluster overview page.
Run the following series of commands to log in and set the active Confluent Cloud environment and cluster.
confluent login --prompt --save
confluent environment list
confluent environment use <ENVIRONMENT_ID>
confluent kafka cluster list
confluent kafka cluster use <CLUSTER_ID>
Generate a Kafka API key by substituting the cluster ID from the previous command:
confluent api-key create --resource <CLUSTER_ID>
Copy the API key into the file queues-for-kafka/src/main/resources/cloud.properties where you see the <API_KEY> placeholder, and copy the API secret where you see the <API_SECRET> placeholder.
Run this command to get your cluster's bootstrap servers endpoint:
confluent kafka cluster describe
Copy the endpoint (of the form pkc-<ID>.<REGION>.<CLOUD>.confluent.cloud:9092) into the same cloud.properties file where you see the <BOOTSTRAP_SERVERS> placeholder. Do not copy the leading SASL_SSL://.
Create a 6-partition topic called strings that we will use to test consumption throughput.
confluent kafka topic create strings --partitions 6
Compile the application from the top-level tutorials repository directory:
./gradlew queues-for-kafka:shadowJar
Navigate into the application's home directory:
cd queues-for-kafka
Run the producer application, passing the cloud.properties Kafka client configuration file that you populated with your Dedicated cluster's bootstrap servers endpoint and credentials:
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
io.confluent.developer.ProducerApp \
--properties-file ./src/main/resources/cloud.properties
In a separate shell, run the regular KafkaConsumer-based application. This will run 16 concurrent consumers. Only 6 will actively consume since a partition can only be assigned to one consumer instance. It will simulate a 500-millisecond workload and report throughput after consuming 1,000 events.
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
io.confluent.developer.ConsumerApp \
--properties-file ./src/main/resources/cloud.properties \
--consumer-type consumer \
--num-consumers 16 \
--wait-ms 500 \
--total-events 1000
The app will exit once 1,000 events have been consumed, which should take around a minute and a half. You will see a log message like this reporting the duration:
Completed consuming 1000 messages in 89.61 seconds.
Next, run the consumer application using share consumers.
First, alter the share-consumer-group to begin consuming from the earliest offset:
<KAFKA_HOME>/bin/kafka-configs.sh --bootstrap-server <BOOTSTRAP_SERVER> \
--group share-consumer-group --alter --add-config 'share.auto.offset.reset=earliest' \
--command-config ./src/main/resources/cloud.properties
Run the consumer app again using the same number of threads and simulated event processing time, except this time pass the share_consumer consumer type:
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
io.confluent.developer.ConsumerApp \
--properties-file ./src/main/resources/cloud.properties \
--consumer-type share_consumer \
--num-consumers 16 \
--wait-ms 500 \
--total-events 1000
This time, the app should take closer to 30 seconds to complete, given that consumption scales to all 16 threads. You will see a log message like this reporting the duration:
Completed consuming 1000 messages in 31.42 seconds.
Try different application configurations to see how consumption throughput is impacted. For example, vary --num-consumers and --wait-ms to see how throughput scales with more workers and different per-event wait times. Also try a different number of topic partitions. How does it impact consumption throughput?
When you are finished, delete the Confluent Cloud resources created for this tutorial. For example, if you are using an isolated environment, delete it by first getting the environment ID in the form env-123456:
confluent environment list
Delete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT_ID>
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
Start Apache Kafka 4.1 with the following command:
docker compose -f ./queues-for-kafka/docker-compose.yml up -d
Open a shell in the broker container:
docker exec --workdir /opt/kafka/bin/ -it broker /bin/bash
Enable share consumers:
./kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1
Alter the share-consumer-group share group to begin consuming from the earliest offset:
./kafka-configs.sh --bootstrap-server localhost:9092 \
--group share-consumer-group --alter \
--add-config 'share.auto.offset.reset=earliest'
In the broker container, create a topic called strings with 6 partitions:
./kafka-topics.sh --bootstrap-server localhost:9092 --create \
--partitions 6 --topic strings
Enter Ctrl+D to exit the container shell.
On your local machine, compile the app:
./gradlew queues-for-kafka:shadowJar
Navigate into the application's home directory:
cd queues-for-kafka
Run the producer application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092:
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
io.confluent.developer.ProducerApp \
--properties-file ./src/main/resources/local.properties
In a separate shell, run the regular KafkaConsumer-based application. This will run 16 concurrent consumers. Only 6 will actively consume since a partition can only be assigned to one consumer instance. It will simulate a 500-millisecond workload and report throughput after consuming 1,000 events.
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
io.confluent.developer.ConsumerApp \
--properties-file ./src/main/resources/local.properties \
--consumer-type consumer \
--num-consumers 16 \
--wait-ms 500 \
--total-events 1000
The app will exit once 1,000 events have been consumed, which should take around a minute and a half. You will see a log message like this reporting the duration:
Completed consuming 1000 messages in 89.61 seconds.
Next, run the consumer app again using the same number of threads and simulated event processing time, except this time pass the share_consumer consumer type:
java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \
io.confluent.developer.ConsumerApp \
--properties-file ./src/main/resources/local.properties \
--consumer-type share_consumer \
--num-consumers 16 \
--wait-ms 500 \
--total-events 1000
This time, the app should take closer to 30 seconds to complete, given that consumption scales to all 16 threads. You will see a log message like this reporting the duration:
Completed consuming 1000 messages in 31.42 seconds.
Try different application configurations to see how consumption throughput is impacted. For example, vary --num-consumers and --wait-ms to see how throughput scales with more workers and different per-event wait times. Also try a different number of topic partitions. How does it impact consumption throughput?
From your local machine, stop the broker container:
docker compose -f ./queues-for-kafka/docker-compose.yml down
This section summarizes the key application source files under src/main/java/io/confluent/developer.
ProducerApp.java: Standalone producer that sends a high volume of string messages to the strings topic.
ConsumerApp.java: Orchestrates multi-threaded consumption to compare regular KafkaConsumer vs KafkaShareConsumer-based throughput.
ConsumerThread.java: A runnable worker used by ConsumerApp that encapsulates the consumption loop for either consumer type.
ConsumerAppArgParser.java: Command-line parsing and validation for the consumer app using Apache Commons CLI.
ProducerAppArgParser.java: Minimal command-line parsing for the producer app.