How can you generate realistic test data in Kafka?
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir generate-test-data-streams && cd generate-test-data-streams
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud). Make sure that you create this file in the same place as the cities.sql
file that you created above.
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
connect:
image: confluentinc/cp-kafka-connect-base:7.3.0
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:9092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.IntegerConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m
(%c:%L)%n'
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components/
command:
- bash
- -c
- "echo \"Installing connector plugins\"\nmkdir -p /usr/share/confluent-hub-components/\nconfluent-hub
install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.6.0\n#\necho
\"Launching Kafka Connect worker\"\n/etc/confluent/docker/run & \n#\nsleep infinity\n"
kcat:
image: edenhill/kcat:1.7.1
container_name: kcat
links:
- broker
entrypoint:
- /bin/sh
- -c
- "apk add jq; \nwhile [ 1 -eq 1 ];do sleep 60;done\n"
Now launch Confluent Platform by running:
docker compose up -d
Before continuing, you need to wait for all the containers to fully start up. Run this script which will wait for the final dependencies in the chain to be ready:
#!/bin/bash
# Wait for Schema Registry to become available
while :
do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8081)
echo -e $(date) " Component: Schema Registry\t\tHTTP state: " $curl_status "\t(waiting for 200)"
if [ $curl_status -eq 200 ]
then
echo "✅✅ Schema Registry is ready"
break
fi
sleep 5
done
# Wait for Kafka Connect to become available
while :
do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/)
echo -e $(date) " Component: Kafka Connect \t\tHTTP state: " $curl_status "\t(waiting for 200)"
if [ $curl_status -eq 200 ]
then
echo "✅✅ Kafka Connect is ready"
break
fi
sleep 5
done
Once everything is ready you should see this, and get control back at your terminal:
Wed 1 Apr 2020 11:58:49 BST Component: Schema Registry HTTP state: 200 (waiting for 200)
✅ Schema Registry is ready
Wed 1 Apr 2020 11:58:49 BST Component: Kafka Connect HTTP state: 200 (waiting for 200)
✅ Kafka Connect is ready
Once the stack has started, run the following command to ensure that the connector plugin that we’re going to be using has been loaded.
docker exec -i connect curl -s localhost:8083/connector-plugins|jq '.[].class'|grep Datagen
You should see this output
"io.confluent.kafka.connect.datagen.DatagenConnector"
We’re going to use Kafka Connect to run the data generator, which is available as a source connector.
To start with, let’s create a connector populating a single topic with some random purchase transactions
curl -X PUT http://localhost:8083/connectors/pizza_orders/config \
-i -H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.kafka.connect.datagen.DatagenConnector",
"quickstart" : "pizza_orders",
"schema.keyfield" : "store_id",
"kafka.topic" : "pizza_orders",
"key.converter" : "org.apache.kafka.connect.converters.IntegerConverter",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"value.converter.schemas.enable" : "false",
"max.interval" : 1000,
"tasks.max" : "1"
}'
Check that the connector is running:
curl -s http://localhost:8083/connectors/pizza_orders/status
You should see that the state is RUNNING
:
{"name":"pizza_orders","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}
With the connector running let’s now inspect the data that’s being generated to the Kafka topic. Here we’ll use kcat, but you could use any Kafka consumer if you wanted.
docker exec -i kcat kcat -b broker:9092 -t pizza_orders \
-s value=avro -r http://schema-registry:8081 \
-u -f 'Key (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n' \
-C -c5 -o end
You should see 5 messages and then the command will exit. Observe that the one message is being generated per second, which is what was specified in the 'max.interval' = 1000
setting.
Key (4 bytes):
Payload (122 bytes): {"store_id": 5, "store_order_id": 1179, "coupon_code": 1046, "date": 18486, "status": "accepted", "order_lines": [{"product_id": 79, "category": "pizza", "quantity": 3, "unit_price": 19.609999999999999, "net_price": 58.829999999999998}, {"product_id": 6, "category": "pizza", "quantity": 3, "unit_price": 13.640000000000001, "net_price": 40.920000000000002}, {"product_id": 95, "category": "wings", "quantity": 1, "unit_price": 24.899999999999999, "net_price": 24.899999999999999}, {"product_id": 37, "category": "pizza", "quantity": 3, "unit_price": 14.619999999999999, "net_price": 43.859999999999999}]}
--
% Reached end of topic pizza_orders [0] at offset 180
Key (4 bytes):
Payload (50 bytes): {"store_id": 3, "store_order_id": 1180, "coupon_code": 1898, "date": 18811, "status": "accepted", "order_lines": [{"product_id": 22, "category": "calzone", "quantity": 5, "unit_price": 2.79, "net_price": 13.949999999999999}]}
--
% Reached end of topic pizza_orders [0] at offset 181
Key (4 bytes):
Payload (99 bytes): {"store_id": 1, "store_order_id": 1181, "coupon_code": 1991, "date": 18840, "status": "accepted", "order_lines": [{"product_id": 6, "category": "pizza", "quantity": 3, "unit_price": 13.640000000000001, "net_price": 40.920000000000002}, {"product_id": 80, "category": "dessert", "quantity": 5, "unit_price": 12.69, "net_price": 63.450000000000003}, {"product_id": 27, "category": "salad", "quantity": 4, "unit_price": 9.2200000000000006, "net_price": 36.880000000000003}]}
--
Key (4 bytes):
Payload (98 bytes): {"store_id": 3, "store_order_id": 1182, "coupon_code": 1268, "date": 18075, "status": "accepted", "order_lines": [{"product_id": 52, "category": "wings", "quantity": 1, "unit_price": 5.6799999999999997, "net_price": 5.6799999999999997}, {"product_id": 33, "category": "salad", "quantity": 3, "unit_price": 10.880000000000001, "net_price": 32.640000000000001}, {"product_id": 4, "category": "calzone", "quantity": 1, "unit_price": 19.039999999999999, "net_price": 19.039999999999999}]}
--
Key (4 bytes):
Payload (126 bytes): {"store_id": 8, "store_order_id": 1183, "coupon_code": 1125, "date": 18781, "status": "accepted", "order_lines": [{"product_id": 59, "category": "calzone", "quantity": 3, "unit_price": 5.7400000000000002, "net_price": 17.219999999999999}, {"product_id": 14, "category": "dessert", "quantity": 2, "unit_price": 18.32, "net_price": 36.640000000000001}, {"product_id": 42, "category": "dessert", "quantity": 1, "unit_price": 6.6100000000000003, "net_price": 6.6100000000000003}, {"product_id": 7, "category": "salad", "quantity": 2, "unit_price": 18.25, "net_price": 36.5}]}
You can also run this without the -c5
argument to see a continuous stream of messages. If you do this, press Ctrl-C
to return to the command prompt.
Now let’s see how you can generate streams of data that are related and consistent. You are going to use bundled schemas available in the Kafka Connect DatagenConnector
.
Consider an example where we want to generate some dummy data for tracking sensors on a fleet of delivery trucks. There will be two streams of data: (1) the current location of each truck, and (2) truck sensor information from each truck. The first stream representing the location of the trucks would contain the following fields:
→ Vehicle id (primary key / unique identifier)
→ Current location of the truck expressed in longitude and latitude
The second stream will contain sensor readings attached to the truck. Now instead of generating lots of random sensor data, we want a limited set so that it’s more realistic, tying it back to a fixed set of imaginary trucks. Each sensor reading will provide the following information:
→ Vehicle id (primary key / unique identifier)
→ Engine Temerature
→ Average RPM
For building the two related streams you’ll need to create two datagen connectors.
Run the following to create the fleet managment location stream:
curl -X PUT http://localhost:8083/connectors/fleet_location/config \
-i -H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.kafka.connect.datagen.DatagenConnector",
"quickstart" : "fleet_mgmt_location",
"schema.keyfield" : "vehicle_id",
"kafka.topic" : "fleet_mgmt_location",
"key.converter" : "org.apache.kafka.connect.converters.IntegerConverter",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"value.converter.schemas.enable" : "false",
"max.interval" : 500,
"tasks.max" : "1"
}'
Check that the fleet connector is running:
curl -s http://localhost:8083/connectors/fleet_location/status
You should see that the state is RUNNING
:
{"name":"fleet_location","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}
Then create the fleet management sensor reading stream you’ll execute the following:
curl -X PUT http://localhost:8083/connectors/fleet_sensor/config \
-i -H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.kafka.connect.datagen.DatagenConnector",
"quickstart" : "fleet_mgmt_sensors",
"schema.keyfield" : "vehicle_id",
"kafka.topic" : "fleet_mgmt_sensors",
"key.converter" : "org.apache.kafka.connect.converters.IntegerConverter",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"value.converter.schemas.enable" : "false",
"max.interval" : 500,
"tasks.max" : "1"
}'
When looking at the connector setting notice the quickstart
entry in the JSON
, this is where you specify a bundled schema the datagen connector provides. By generating two streams of data with related keys you’ll be able to join records or perform aggregations with the related keys.
Now let’s confirm the sensor connector is running:
curl -s http://localhost:8083/connectors/fleet_sensor/status
Again the state should be RUNNING
:
{"name":"fleet_sensor","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}
We now have two Kafka topics being written to. The first (devices
) is keyed on the MAC address, as can be seen from the data:
docker exec -i kcat kcat -b broker:9092 -t fleet_mgmt_location \
-s key=s -s value=avro -r http://schema-registry:8081 \
-f 'Key (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n' \
-C -c3 -o beginning 2>/dev/null
Key (4 bytes):
Payload (29 bytes): {"vehicle_id": 6242, "location": {"latitude": 0.6188530445835535, "longitude": 0.53424204571321932}, "ts": 1609459200000}
--
Key (4 bytes):
Payload (29 bytes): {"vehicle_id": 2956, "location": {"latitude": 0.38269701406096923, "longitude": 0.92006741132971748}, "ts": 1609459300000}
--
Key (4 bytes):
Payload (29 bytes): {"vehicle_id": 7982, "location": {"latitude": 0.35200846940237418, "longitude": 0.90592937818464314}, "ts": 1609459400000}
--
The second is a stream of information about network traffic for the devices created in the topic above.
docker exec -i kcat kcat -b broker:9092 -t fleet_mgmt_sensors \
-s key=s -s value=avro -r http://schema-registry:8081 \
-f 'Key (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n' \
-C -c5 -o end -u 2>/dev/null
Key (4 bytes):
Payload (11 bytes): {"vehicle_id": 5712, "engine_temperature": 172, "average_rpm": 2359}
--
Key (4 bytes):
Payload (11 bytes): {"vehicle_id": 2255, "engine_temperature": 195, "average_rpm": 2444}
--
Key (4 bytes):
Payload (11 bytes): {"vehicle_id": 5973, "engine_temperature": 233, "average_rpm": 2194}
--
Key (4 bytes):
Payload (12 bytes): {"vehicle_id": 8429, "engine_temperature": 194, "average_rpm": 4729}
--
Key (4 bytes):
Payload (11 bytes): {"vehicle_id": 2816, "engine_temperature": 180, "average_rpm": 3598}
--
Shut down the stack by running:
docker compose down