How to generate mock data to a Kafka topic using the Datagen Source Connector

Question:

How can you produce mock data to Kafka topics to test your Kafka applications?

Edit this page

Example use case:

In this tutorial, you will learn about testing your Kafka applications. You'll run an instance of the Kafka Connect Datagen connector to produce mock data to a Kafka cluster.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir kafka-connect-datagen && cd kafka-connect-datagen

Get Confluent Platform

3

Create a Dockerfile that builds a custom container for Kafka Connect bundled with the free and open source Kafka Connect Datagen connector, installed from Confluent Hub.

FROM confluentinc/cp-kafka-connect-base:7.3.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud). Note that it also is configured to build a local image.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

  connect:
    image: localimage/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"

Now launch Confluent Platform by running the following command. Note the --build argument which automatically builds the Docker image for Kafka Connect and the bundled kafka-connect-datagen connector.

docker compose up -d --build

Create the connector

4

Create the Kafka Connect Datagen source connector. It automatically creates the Kafka topic pageviews and produces data to it with a schema specification from https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/resources/pageviews_schema.avro

curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "pageviews",
            "quickstart": "pageviews",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

If you run this before Kafka Connect has finished starting up you’ll get the error curl: (52) Empty reply from server - in which case, rerun the above command.

Check that the connector is running:

curl -s http://localhost:8083/connectors/datagen_local_01/status

You should see that the state is RUNNING for both connector and tasks elements

{"name":"datagen_local_01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

If you get the message {"error_code":404,"message":"No status found for connector datagen_local_01"} then check that the step above in which you created the connector actually succeeded.

Consume events from the Kafka topic

5

Now that the kafka-connect-datagen is running, run the Kafka Avro console consumer to see the data streaming into the Kafka topic.

Note the added properties of print.key and key.separator.

docker exec -it connect kafka-avro-console-consumer \
 --bootstrap-server broker:9092 \
 --property schema.registry.url=http://schema-registry:8081 \
 --topic pageviews \
 --property print.key=true \
 --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
 --property key.separator=" : " \
 --max-messages 10

After the consumer starts you should see the following output in a few seconds:


461 : {"viewtime":461,"userid":"User_5","pageid":"Page_43"}
471 : {"viewtime":471,"userid":"User_6","pageid":"Page_54"}
481 : {"viewtime":481,"userid":"User_3","pageid":"Page_59"}
491 : {"viewtime":491,"userid":"User_9","pageid":"Page_65"}
501 : {"viewtime":501,"userid":"User_9","pageid":"Page_80"}
511 : {"viewtime":511,"userid":"User_5","pageid":"Page_26"}
521 : {"viewtime":521,"userid":"User_2","pageid":"Page_96"}
531 : {"viewtime":531,"userid":"User_6","pageid":"Page_94"}
541 : {"viewtime":541,"userid":"User_3","pageid":"Page_67"}
551 : {"viewtime":551,"userid":"User_7","pageid":"Page_99"}
Processed a total of 10 messages

Take it to production

Cleanup Docker containers

1

This step is for Docker cleanup. You can shut down the stack by running:

docker compose down