How to generate mock data to a Kafka topic using the Datagen Source Connector

Question:

How can you produce mock data to Kafka topics to test your Kafka applications?

Edit this page

Example use case:

In this tutorial, you will learn about testing your Kafka applications. You'll run an instance of the Kafka Connect Datagen connector to produce mock data to a Kafka cluster.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir kafka-connect-datagen && cd kafka-connect-datagen

Get Confluent Platform

3

Create a Dockerfile that builds a custom container for Kafka Connect bundled with the free and open source Kafka Connect Datagen connector, installed from Confluent Hub.

FROM confluentinc/cp-kafka-connect-base:7.3.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud). Note that it also is configured to build a local image.

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
    - broker
    ports:
    - 8081:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
  connect:
    image: localimage/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
    container_name: connect
    depends_on:
    - broker
    - schema-registry
    ports:
    - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'

Now launch Confluent Platform by running the following command. Note the --build argument which automatically builds the Docker image for Kafka Connect and the bundled kafka-connect-datagen connector.

docker compose up -d --build

Create the connector

4

Create the Kafka Connect Datagen source connector. It automatically creates the Kafka topic pageviews and produces data to it with a schema specification from https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/resources/pageviews_schema.avro

curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "pageviews",
            "quickstart": "pageviews",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

If you run this before Kafka Connect has finished starting up you’ll get the error curl: (52) Empty reply from server - in which case, rerun the above command.

Check that the connector is running:

curl -s http://localhost:8083/connectors/datagen_local_01/status

You should see that the state is RUNNING for both connector and tasks elements

{"name":"datagen_local_01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

If you get the message {"error_code":404,"message":"No status found for connector datagen_local_01"} then check that the step above in which you created the connector actually succeeded.

Consume events from the Kafka topic

5

Now that the kafka-connect-datagen is running, run the Kafka Avro console consumer to see the data streaming into the Kafka topic.

Note the added properties of print.key and key.separator.

docker exec -it connect kafka-avro-console-consumer \
 --bootstrap-server broker:9092 \
 --property schema.registry.url=http://schema-registry:8081 \
 --topic pageviews \
 --property print.key=true \
 --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
 --property key.separator=" : " \
 --max-messages 10

After the consumer starts you should see the following output in a few seconds:


461 : {"viewtime":461,"userid":"User_5","pageid":"Page_43"}
471 : {"viewtime":471,"userid":"User_6","pageid":"Page_54"}
481 : {"viewtime":481,"userid":"User_3","pageid":"Page_59"}
491 : {"viewtime":491,"userid":"User_9","pageid":"Page_65"}
501 : {"viewtime":501,"userid":"User_9","pageid":"Page_80"}
511 : {"viewtime":511,"userid":"User_5","pageid":"Page_26"}
521 : {"viewtime":521,"userid":"User_2","pageid":"Page_96"}
531 : {"viewtime":531,"userid":"User_6","pageid":"Page_94"}
541 : {"viewtime":541,"userid":"User_3","pageid":"Page_67"}
551 : {"viewtime":551,"userid":"User_7","pageid":"Page_99"}
Processed a total of 10 messages