How to count messages in a Kafka topic

Question:

How can you count the number of messages in a Kafka topic?

Edit this page

Example use case:

It can be useful to know how many messages are currently in a topic, but you cannot calculate this directly based on the offsets, because you need to consider the topic's retention policy, log compaction, and potential duplicate messages. In this example, we'll take a topic of pageview data and see how we can count all of the messages in the topic. Note that the time complexity for this tutorial is O(n) (linear); processing time will depend on the number of messages in the topic, and large data sets will require long running times.

Hands-on code example:

New to Confluent Cloud? Get started here.




Short Answer

If your Kafka topic is in Confluent Cloud, consume the entire topic using kafkacat and count how many messages are read.

docker run -it --network=host \
    -v ${PWD}/configuration/ccloud.properties:/tmp/configuration/ccloud.properties \
    edenhill/kcat:1.7.0 kafkacat \
         -F /tmp/configuration/ccloud.properties \
         -C -t test-topic \
         -e -q \
         | grep -v "Reading configuration from file" | wc -l

With the Confluent Cloud Metrics API, you could also sum up the values of the metric io.confluent.kafka.server/received_records, which is "The delta count of records received. Each sample is the number of records received since the previous data sample. The count is sampled every 60 seconds." See the documentation for details.

Run it

Provision your Kafka cluster

1

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.

Take me to Confluent Cloud
  1. After you log in to Confluent Cloud, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

Initialize the project

2

Make a local directory anywhere you’d like for this project:

mkdir count-messages && cd count-messages

Write the cluster information into a local file

3

From the Confluent Cloud UI, navigate to your Kafka cluster. From the Clients view, get the connection information customized to your cluster (select C/C++).

Create new credentials for your Kafka cluster, and then Confluent Cloud will show a configuration similar to below with your new credentials automatically populated (make sure show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Kafka
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username={{ CLUSTER_API_KEY }}
sasl.password={{ CLUSTER_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the UI so that it includes your Confluent Cloud information and credentials.

Download and setup the Confluent CLI

4

This tutorial has some steps for Kafka topic management and/or reading from or writing to Kafka topics, for which you can use the Confluent Cloud Console or install the Confluent CLI. Instructions for installing Confluent CLI and configuring it to your Confluent Cloud environment is available from within the Confluent Cloud Console: navigate to your Kafka cluster, click on the CLI and tools link, and run through the steps in the Confluent CLI tab.

The CLI clients for Confluent Cloud (ccloud) and Confluent Platform (confluent v1.0) have been unified into a single client Confluent CLI confluent v2.0. This tutorial uses the unified Confluent CLI confluent v2.0 (ccloud client will continue to work until sunset on May 9, 2022, and you can read the migration instructions to the unified confluent CLI at https://docs.confluent.io/confluent-cli/current/migrate.html).

Create the Kafka topic

5

In this step we’re going to create a topic for use during this tutorial. Use the following command to create the topic:

confluent kafka topic create test-topic

Produce messages to the topic

6

Produce some messages to the Kafka topic.

confluent kafka topic produce test-topic

Enter a few records and press <ctrl-c> when finished.

Apache
Kafka
Is
The
Best

Count the messages

7

You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

To do this from the commandline you can use the kafkacat tool which is built around the Unix philosophy of pipelines. This means that you can pipe the output (messages) from kafkacat into another tool like wc to count the number of messages.

As input, pass in the configuration/ccloud.properties file that you created in an earlier step.

docker run -it --network=host \
    -v ${PWD}/configuration/ccloud.properties:/tmp/configuration/ccloud.properties \
    edenhill/kcat:1.7.0 kafkacat \
         -F /tmp/configuration/ccloud.properties \
         -C -t test-topic \
         -e -q \
         | grep -v "Reading configuration from file" | wc -l

Let’s take a close look at the commandline soup we’ve used here to count the messages.

  • docker exec kafkacat runs the following command with its arguments in the Docker container called kafkacat

  • \ is a line continuation character

    • kafkacat runs kafkacat itself, passing in arguments as follows:

      • -F Kafka cluster connection information

      • -C act as a consumer

      • -t read data from the test-topic topic

      • -e exit once at the end of the topic

      • -q run quietly

    • | pipes the messages from kafkacat to the next command

    • grep -v "Reading configuration from file" skip the log message

    • wc -l reads the piped messages and writes the number of lines in total (one message per line) to screen

Finally, the output of the command is the message count.

      5

Teardown Confluent Cloud resources

8

You may try another tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all the resources you created. Verify they are destroyed to avoid unexpected charges.