How can you count the number of messages in a Kafka topic?
Consume the entire Kafka topic using kcat
, and count how many messages are read.
docker exec kcat \
kcat -b broker:29092 -C -t pageviews -e -q | \
wc -l
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
Make a local directory anywhere you’d like for this project:
mkdir count-messages && cd count-messages
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
container_name: broker
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
ksqldb:
image: confluentinc/ksqldb-server:0.28.2
container_name: ksqldb
depends_on:
- broker
ports:
- 8088:8088
user: root
environment:
KSQL_CONFIG_DIR: /etc/ksqldb
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_HIDDEN_TOPICS: ^_.*
KSQL_KSQL_CONNECT_WORKER_CONFIG: /etc/ksqldb/connect.properties
KSQL_CONNECT_BOOTSTRAP_SERVERS: broker:29092
KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: ksqldb
KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
KSQL_CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m
(%c:%L)%n'
KSQL_CONNECT_PLUGIN_PATH: /usr/share/java
command:
- bash
- -c
- |
echo "Installing connector plugins"
# I miss the confluent-hub client :-/
# mkdir -p /usr/share/confluent-hub-components/
# confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.3.3
# ------ hack to workaround absence of confluent-hub client
curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-datagen/versions/0.4.0/confluentinc-kafka-connect-datagen-0.4.0.zip -o /tmp/kafka-connect-datagen.zip
yum install -y unzip
unzip /tmp/kafka-connect-datagen.zip -d /usr/share/java/kafka-connect-datagen
# ----------------------------------------------------------
#
echo "Launching ksqlDB"
/usr/bin/docker/run &
echo "Waiting for Kafka Connect to start listening on localhost ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Creating Data Generators"
curl -i -X PUT http://localhost:8083/connectors/datagen_01/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"value.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"iterations": 42,
"tasks.max": "1"
}'
curl -i -X PUT http://localhost:8083/connectors/datagen_02/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"value.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"kafka.topic": "trades",
"quickstart": "Stock_Trades",
"max.interval": 1000,
"iterations": 4242424242,
"tasks.max": "1"
}'
sleep infinity
volumes:
- ./src:/opt/app/src
- ./test:/opt/app/test
kcat:
image: edenhill/kcat:1.7.1
container_name: kcat
links:
- broker
entrypoint:
- /bin/sh
- -c
- "apk add jq; \nwhile [ 1 -eq 1 ];do sleep 60;done\n"
And bring up the stack of components by running:
docker compose up -d
Run this snippet of code which will block until the necessary components have started
docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for startup… \n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done'
You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.
To do this from the commandline you can use the kcat
tool which can act as a consumer (and producer) and is built around the Unix philosophy of pipelines. This means that you can pipe the output (messages) from kcat into another tool to count the number of messages.
docker exec kcat \
kcat -b broker:29092 -C -t pageviews -e -q | \
wc -l
Let’s take a close look at the commandline soup we’ve used here to count the messages.
docker exec kcat
runs the following command with its arguments in the Docker container called kcat
\
is a line continuation character
kcat
runs kcat itself, passing in arguments as follows:
-b
the location of the cluster broker(s)
-C
act as a consumer
-t
read data from the pageviews
topic
-e
exit once at the end of the topic
-q
run quietly
|
pipes the messages from kcat to the next command
wc
reads the piped messages and writes the count to screen
-l
specifies to count the number of lines in total (one message per line). Contrast this to -c
which would return the number of bytes.
Finally, the output of the command is the message count.
42
Once you’ve finished you can tear down the Docker Compose stack. This will delete all data that you’ve stored in Kafka.
docker compose down
In the example above we provision an entire stack include kcat connecting to a broker in a Docker container. You can use kcat from a Docker container to connect to Kafka clusters elsewhere, such as Confluent Cloud.
If you don’t have an account yet, sign up for Confluent Cloud.
Use the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details).
Using your Confluent Cloud broker address and API key set the following environment variables
export CCLOUD_BROKER_HOST=my.cluster.gcp.confluent.cloud
export CCLOUD_API_KEY=XXXX
export CCLOUD_API_SECRET=YYYY
Now we can run kcat (passing in the necessary Confluent Cloud details from environment variables) to count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.
docker run --rm edenhill/kcat:1.7.1 \
-X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
-X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
-X sasl.username="${CCLOUD_API_KEY}" \
-X sasl.password="${CCLOUD_API_SECRET}" \
-b ${CCLOUD_BROKER_HOST}:9092 \
-t my_topic \
-C -e -q | \
wc -l