How can you read from a specific offset and partition of a Kafka topic?
Use the kafka-console-consumer
command with the --partition
and --offset
flags to read from a specific partition and offset.
kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \
--property print.key=true \
--property key.separator="-" \
--partition 1 \
--offset 6
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
Make a local directory anywhere you’d like for this project:
mkdir console-consumer-read-specific-offsets-partition && cd console-consumer-read-specific-offsets-partition
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud).
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
Now launch Confluent Platform by running:
docker compose up -d
Your first step is to create a topic to produce to and consume from. This time you’ll add more than one partition so you can see how the keys end up on different partitions.
Your first step is to open a shell on the broker container:
docker exec broker bash
Then use the following command to create the topic:
kafka-topics --create --topic example-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 2
Keep the container shell you just started open, as you’ll use it in the next step.
To get started, lets produce some records to your new topic.
Since you’ve created a topic with more than one partition, you’ll send full key-value pairs so you’ll be able to see how different keys end up on different partitions.
To send full key-value pairs you’ll specify the parse.key
and key.separator
options to the console producer command.
Let’s run the following command in the broker container shell from the previous step to start a new console producer:
kafka-console-producer --topic example-topic --bootstrap-server broker:9092 \
--property parse.key=true \
--property key.separator=":"
Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:
key1:the lazy
key2:fox jumped
key3:over the
key4:brown cow
key1:All
key2:streams
key3:lead
key4:to
key1:Kafka
key2:Go to
key3:Kafka
key4:summit
After you’ve sent the records, you can close the producer with Ctrl-C
, but keep the broker container shell open as you’ll still need it for the next few steps.
Next let’s open up a console consumer to read records sent to the topic in the previous step, but you’ll only read from the first partition. Kafka partitions
are zero based so your two partitions are numbered 0
, and 1
respectively.
Using the broker container shell, lets start a console consumer to read only records from the first partition, 0
kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator="-" \
--partition 0
After a few seconds you should see something like this (your output will vary depending on the hashing algorithm):
key1-the lazy
key1-All
key1-Kafka
You’ll notice you sent 12 records, but only 3 went to the first partition. The reason for this is the way Kafka calculates the partition assignment for a given record. Kafka calculates the partition by taking the hash of the key modulo the number of partitions. So, even though you have 2 partitions, depending on what the key hash value is, you aren’t guaranteed an even distribution of records across partitions.
Go ahead and shut down the current consumer with Ctrl-C
.
In the previous step, you consumed records from the first partition of your topic. In this step you’ll consume the rest of your records from the second partition 1
.
If you haven’t done so already, close the previous console consumer with Ctrl-C
.
Then start a new console consumer to read only records from the second partition:
kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator="-" \
--partition 1
After a few seconds you should see something like this
key2-fox jumped
key3-over the
key4-brown cow
key2-streams
key3-lead
key4-to
key2-Go to
key3-Kafka
key4-summit
As you’d expect, the remaining 9 records are on the second partition.
Go ahead and shut down the current consumer with Ctrl-C
.
So far you’ve learned how to consume records from a specific partition. When you specify the partition, you can optionally specify the offset to start consuming from. Specifying a specific offset can be helpful when debugging an issue, in that you can skip consuming records that you know aren’t a potential problem.
If you haven’t done so already, close the previous console consumer with Ctrl-C
.
From the previous step you know there are 9 records in the second partition. In this step you’ll only consume records starting from offset 6, so you should only see the last 3 records on the screen. The changes in this command include removing the --from-beginning
property and adding an --offset
flag
Here’s the command to read records from the second partition starting at offset 6:
kafka-console-consumer --topic example-topic --bootstrap-server broker:9092 \
--property print.key=true \
--property key.separator="-" \
--partition 1 \
--offset 6
After a few seconds you should see something like this
key2-Go to
key3-Kafka
key4-summit
As you can see, you’ve consumed records starting from offset 6
to the end of the log.
Go ahead and shut down the current consumer with Ctrl-C
.
You’re all done now!
Go back to your open windows and stop any console consumers with Ctrl-C
then close the container shells with Ctrl-D
.
Then you can shut down the docker container by running:
docker compose down
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.
Sign up for Confluent Cloud, a fully managed Apache Kafka service.
After you log in to Confluent Cloud Console, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
Next, from the Confluent Cloud Console, click on Clients
to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.