How can you produce mock data to Kafka topics to test your Kafka applications?
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir kafka-connect-datagen && cd kafka-connect-datagen
Create a Dockerfile
that builds a custom container for Kafka Connect bundled with the free and open source Kafka Connect Datagen connector, installed from Confluent Hub.
FROM confluentinc/cp-kafka-connect-base:7.3.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud). Note that it also is configured to build a local image.
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
connect:
image: localimage/kafka-connect-datagen:latest
build:
context: .
dockerfile: Dockerfile
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:9092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
Now launch Confluent Platform by running the following command. Note the --build
argument which automatically builds the Docker image for Kafka Connect and the bundled kafka-connect-datagen connector.
docker compose up -d --build
Create the Kafka Connect Datagen source connector. It automatically creates the Kafka topic pageviews
and produces data to it with a schema specification from https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/resources/pageviews_schema.avro
curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}'
If you run this before Kafka Connect has finished starting up you’ll get the error curl: (52) Empty reply from server
- in which case, rerun the above command.
Check that the connector is running:
curl -s http://localhost:8083/connectors/datagen_local_01/status
You should see that the state is RUNNING
for both connector
and tasks
elements
{"name":"datagen_local_01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}
If you get the message {"error_code":404,"message":"No status found for connector datagen_local_01"}
then check that the step above in which you created the connector actually succeeded.
Now that the kafka-connect-datagen is running, run the Kafka Avro console consumer to see the data streaming into the Kafka topic.
Note the added properties of print.key
and key.separator
.
docker exec -it connect kafka-avro-console-consumer \
--bootstrap-server broker:9092 \
--property schema.registry.url=http://schema-registry:8081 \
--topic pageviews \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property key.separator=" : " \
--max-messages 10
After the consumer starts you should see the following output in a few seconds:
461 : {"viewtime":461,"userid":"User_5","pageid":"Page_43"}
471 : {"viewtime":471,"userid":"User_6","pageid":"Page_54"}
481 : {"viewtime":481,"userid":"User_3","pageid":"Page_59"}
491 : {"viewtime":491,"userid":"User_9","pageid":"Page_65"}
501 : {"viewtime":501,"userid":"User_9","pageid":"Page_80"}
511 : {"viewtime":511,"userid":"User_5","pageid":"Page_26"}
521 : {"viewtime":521,"userid":"User_2","pageid":"Page_96"}
531 : {"viewtime":531,"userid":"User_6","pageid":"Page_94"}
541 : {"viewtime":541,"userid":"User_3","pageid":"Page_67"}
551 : {"viewtime":551,"userid":"User_7","pageid":"Page_99"}
Processed a total of 10 messages