How to count messages in a Kafka topic

Question:

How can you count the number of messages in a Kafka topic?

Edit this page

Example use case:

It can be useful to know how many messages are currently in a topic, but you cannot calculate this directly based on the offsets, because you need to consider the topic's retention policy, log compaction, and potential duplicate messages. In this example, we'll take a topic of pageview data and see how we can count all of the messages in the topic. Note that the time complexity for this tutorial is O(n) (linear); processing time will depend on the number of messages in the topic, and large data sets will require long running times.

Hands-on code example:

Short Answer

Consume the entire Kafka topic using kcat, and count how many messages are read.

docker exec kcat \
    kcat -b broker:29092 -C -t pageviews -e -q | \
    wc -l

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

Make a local directory anywhere you’d like for this project:

mkdir count-messages && cd count-messages

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.28.2
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
    user: root
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
      # Kafka Connect config below
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'

    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # I miss the confluent-hub client :-/
        # mkdir -p /usr/share/confluent-hub-components/
        # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.3.3
        # ------ hack to workaround absence of confluent-hub client
        curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-datagen/versions/0.4.0/confluentinc-kafka-connect-datagen-0.4.0.zip -o /tmp/kafka-connect-datagen.zip
        yum install -y unzip
        unzip /tmp/kafka-connect-datagen.zip -d /usr/share/java/kafka-connect-datagen
        # ----------------------------------------------------------
        #
        echo "Launching ksqlDB"
        /usr/bin/docker/run &

         echo "Waiting for Kafka Connect to start listening on localhost ⏳"
         while : ; do
          curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
          echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
          if [ $$curl_status -eq 200 ] ; then
            break
          fi
          sleep 5
        done

        echo -e "\n--\n+> Creating Data Generators"
        curl -i -X PUT http://localhost:8083/connectors/datagen_01/config \
             -H "Content-Type: application/json" \
             -d '{
                    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                    "value.converter.schemas.enable":false,
                    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                    "kafka.topic": "pageviews",
                    "quickstart": "pageviews",
                    "iterations": 42,
                    "tasks.max": "1"
                }'

        curl -i -X PUT http://localhost:8083/connectors/datagen_02/config \
             -H "Content-Type: application/json" \
              -d '{
                    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                    "value.converter.schemas.enable":false,
                    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                    "kafka.topic": "trades",
                    "quickstart": "Stock_Trades",
                    "max.interval": 1000,
                    "iterations": 4242424242,
                    "tasks.max": "1"
                }'

        sleep infinity
    volumes:
      - ./src:/opt/app/src
      - ./test:/opt/app/test

  kcat:
    image: edenhill/kcat:1.7.1
    container_name: kcat
    links:
      - broker
    entrypoint:
      - /bin/sh
      - -c
      - |
        apk add jq;
        while [ 1 -eq 1 ];do sleep 60;done

And bring up the stack of components by running:

docker compose up -d

Run this snippet of code which will block until the necessary components have started

docker exec -it ksqldb bash -c 'echo -e "\n\n  Waiting for startup… \n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done'

Run kcat to count the messages

4

You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

To do this from the commandline you can use the kcat tool which can act as a consumer (and producer) and is built around the Unix philosophy of pipelines. This means that you can pipe the output (messages) from kcat into another tool to count the number of messages.

docker exec kcat \
    kcat -b broker:29092 -C -t pageviews -e -q | \
    wc -l

Let’s take a close look at the commandline soup we’ve used here to count the messages.

  • docker exec kcat runs the following command with its arguments in the Docker container called kcat

  • \ is a line continuation character

    • kcat runs kcat itself, passing in arguments as follows:

      • -b the location of the cluster broker(s)

      • -C act as a consumer

      • -t read data from the pageviews topic

      • -e exit once at the end of the topic

      • -q run quietly

    • | pipes the messages from kcat to the next command

    • wc reads the piped messages and writes the count to screen

      • -l specifies to count the number of lines in total (one message per line). Contrast this to -c which would return the number of bytes.

Finally, the output of the command is the message count.

      42

Clean up

5

Once you’ve finished you can tear down the Docker Compose stack. This will delete all data that you’ve stored in Kafka.

docker compose down

Use kcat with Confluent Cloud

6

In the example above we provision an entire stack include kcat connecting to a broker in a Docker container. You can use kcat from a Docker container to connect to Kafka clusters elsewhere, such as Confluent Cloud.

If you don’t have an account yet, sign up for Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

Using your Confluent Cloud broker address and API key set the following environment variables

export CCLOUD_BROKER_HOST=my.cluster.gcp.confluent.cloud
export CCLOUD_API_KEY=XXXX
export CCLOUD_API_SECRET=YYYY

Now we can run kcat (passing in the necessary Confluent Cloud details from environment variables) to count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

docker run --rm edenhill/kcat:1.7.1 \
    -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
    -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
    -X sasl.username="${CCLOUD_API_KEY}" \
    -X sasl.password="${CCLOUD_API_SECRET}" \
    -b ${CCLOUD_BROKER_HOST}:9092 \
    -t my_topic \
    -C -e -q | \
    wc -l