How can you generate realistic test data in Kafka?
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir generate-test-data-streams && cd generate-test-data-streams
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud). Make sure that you create this file in the generate-test-data-streams
directory created above.
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
ksqldb-server:
image: confluentinc/ksqldb-server:0.28.2
hostname: ksqldb
container_name: ksqldb
depends_on:
- broker
ports:
- 8088:8088
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_HIDDEN_TOPICS: ^_.*
KSQL_KSQL_CONNECT_WORKER_CONFIG: /connect/connect.properties
KSQL_CONNECT_BOOTSTRAP_SERVERS: broker:9092
KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: ksqldb
KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.IntegerConverter
KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m
(%c:%L)%n'
KSQL_CONNECT_PLUGIN_PATH: /usr/share/java,/data/connect-jars
command:
- bash
- -lc
- "echo \"Installing connector plugins\"\nconfluent-hub install --no-prompt --component-dir
/usr/share/java/ confluentinc/kafka-connect-datagen:0.6.0\n#\necho \"Launching
ksqlDB\"\n/usr/bin/docker/run & \n#\nsleep infinity\n"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.28.2
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
environment:
KSQL_CONFIG_DIR: /etc/ksql
volumes:
- ./src:/opt/app/src
Now launch Confluent Platform by running:
docker compose up -d
Before continuing, you need to wait for all the containers to fully start up. Run this script which will wait for the final dependencies in the chain to be ready:
#!/bin/bash
# Wait for Schema Registry to become available
while :
do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8081)
echo -e $(date) " Component: Schema Registry\t\tHTTP state: " $curl_status "\t(waiting for 200)"
if [ $curl_status -eq 200 ]
then
echo "✅✅ Schema Registry is ready"
break
fi
sleep 5
done
# Wait for ksqlDB to become available
while :
do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8088/info)
echo -e $(date) " Component: ksqlDB \t\t\tHTTP state: " $curl_status "\t(waiting for 200)"
if [ $curl_status -eq 200 ]
then
echo "✅✅ ksqlDB is ready"
break
fi
sleep 5
done
Once everything is ready you should see this, and get control back at your terminal:
Wed 1 Apr 2020 11:58:49 BST Component: Schema Registry HTTP state: 200 (waiting for 200)
✅ Schema Registry is ready
Wed 1 Apr 2020 11:58:49 BST Component: ksqlDB HTTP state: 200 (waiting for 200)
✅ ksqlDB is ready
Once the stack has started, run the following command to ensure that the connector plugin that we’re going to be using has been loaded.
docker exec -i ksqldb curl -s localhost:8083/connector-plugins|jq '.[].class'
You should see this output
"io.confluent.kafka.connect.datagen.DatagenConnector"
Launch the ksqlDB CLI:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
From the ksqlDB prompt you can create the data generator, which is available as a source connector.
To start with, let’s create a connector populating a single topic with some random purchase transactions
CREATE SOURCE CONNECTOR IF NOT EXISTS PIZZA_ORDERS WITH (
'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector',
'quickstart' = 'pizza_orders',
'schema.keyfield' = 'store_id',
'kafka.topic' = 'pizza_orders',
'key.converter' = 'org.apache.kafka.connect.converters.IntegerConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter.schemas.enable' = 'false',
'max.interval' = 1000,
'tasks.max' = 1
);
Check that the connector is running:
SHOW CONNECTORS;
You should see that the state is RUNNING
:
Connector Name | Type | Class | Status
-------------------------------------------------------------------------------------------------------------
PIZZA_ORDERS | SOURCE | io.confluent.kafka.connect.datagen.DatagenConnector | RUNNING (1/1 tasks RUNNING)
-------------------------------------------------------------------------------------------------------------
You can also inspect further details about the connector:
DESCRIBE CONNECTOR PIZZA_ORDERS;
Name : PIZZA_ORDERS
Class : io.confluent.kafka.connect.datagen.DatagenConnector
Type : source
State : RUNNING
WorkerId : ksqldb:8083
Task ID | State | Error Trace
---------------------------------
0 | RUNNING |
---------------------------------
With the connector running let’s now inspect the data that’s being generated to the Kafka topic. ksqlDB’s PRINT
command will show the contents of a topic:
PRINT pizza_orders LIMIT 5;
You should see 5 messages and then the command will exit. Observe that the one message is being generated per second, which is what was specified in the 'max.interval' = 1000
setting.
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2023/07/19 17:12:36.396 Z, key: 8, value: {"store_id": 8, "store_order_id": 1908, "coupon_code": 1534, "date": 18585, "status": "accepted", "order_lines": [{"product_id": 52, "category": "wings", "quantity": 3, "unit_price": 9.47, "net_price": 28.41}, {"product_id": 97, "category": "dessert", "quantity": 3, "unit_price": 16.03, "net_price": 48.09}]}, partition: 0
rowtime: 2023/07/19 17:12:36.669 Z, key: 2, value: {"store_id": 2, "store_order_id": 1909, "coupon_code": 1604, "date": 18568, "status": "accepted", "order_lines": [{"product_id": 25, "category": "salad", "quantity": 4, "unit_price": 3.17, "net_price": 12.68}, {"product_id": 75, "category": "pizza", "quantity": 5, "unit_price": 16.09, "net_price": 80.45}, {"product_id": 27, "category": "calzone", "quantity": 1, "unit_price": 8.64, "net_price": 8.64}, {"product_id": 48, "category": "dessert", "quantity": 1, "unit_price": 4.18, "net_price": 4.18}]}, partition: 0
rowtime: 2023/07/19 17:12:36.878 Z, key: 4, value: {"store_id": 4, "store_order_id": 1910, "coupon_code": 1159, "date": 18414, "status": "accepted", "order_lines": [{"product_id": 67, "category": "dessert", "quantity": 1, "unit_price": 3.02, "net_price": 3.02}, {"product_id": 72, "category": "salad", "quantity": 5, "unit_price": 24.97, "net_price": 124.85}]}, partition: 0
rowtime: 2023/07/19 17:12:36.974 Z, key: 8, value: {"store_id": 8, "store_order_id": 1911, "coupon_code": 1911, "date": 18674, "status": "accepted", "order_lines": [{"product_id": 98, "category": "wings", "quantity": 1, "unit_price": 6.66, "net_price": 6.66}]}, partition: 0
rowtime: 2023/07/19 17:12:37.047 Z, key: 4, value: {"store_id": 4, "store_order_id": 1912, "coupon_code": 1890, "date": 18667, "status": "accepted", "order_lines": [{"product_id": 65, "category": "dessert", "quantity": 5, "unit_price": 2.63, "net_price": 13.15}, {"product_id": 78, "category": "calzone", "quantity": 1, "unit_price": 2.15, "net_price": 2.15}, {"product_id": 78, "category": "pizza", "quantity": 2, "unit_price": 11.45, "net_price": 22.9}]}, partition: 0
Topic printing ceased
You can also run this without the LIMIT 5
to see a continuous stream of messages. If you do this, press Ctrl-C
to return to the ksqlDB prompt.
The connector that we’ve created populates a Kafka topic. As such, we can consume from it using any Kafka consumer. Since we’re in ksqlDB already let’s declare a stream over it so that we can project and filter on the data being generated.
Note that because we’re using Avro the schema is picked up automagically from the Schema Registry.
CREATE STREAM PIZZA_ORDERS WITH (KAFKA_TOPIC='pizza_orders', VALUE_FORMAT='AVRO');
Use the DESCRIBE
command to inspect the schema of the stream that’s been created:
DESCRIBE PIZZA_ORDERS;
Name : PIZZA_ORDERS
Field | Type
-------------------------------------------------------------------------------------------------------------------------------------
STORE_ID | INTEGER
STORE_ORDER_ID | INTEGER
COUPON_CODE | INTEGER
DATE | DATE
STATUS | VARCHAR(STRING)
ORDER_LINES | ARRAY<STRUCT<PRODUCT_ID INTEGER, CATEGORY VARCHAR(STRING), QUANTITY INTEGER, UNIT_PRICE DOUBLE, NET_PRICE DOUBLE>>
-------------------------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
With this stream object created, we can query it using standard SQL constructs such as selecting fields and applying predicates:
SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS PIZZA_ORDER_TS,
STORE_ID,
STORE_ORDER_ID,
STATUS
FROM PIZZA_ORDERS
WHERE COUPON_CODE BETWEEN 1200 and 1600
EMIT CHANGES
LIMIT 5;
|PIZZA_ORDER_TS |STORE_ID |STORE_ORDER_ID |STATUS |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|2023-07-19 18:05:00 |8 |1001 |accepted |
|2023-07-19 18:05:01 |8 |1003 |accepted |
|2023-07-19 18:05:02 |2 |1005 |accepted |
|2023-07-19 18:05:05 |4 |1012 |accepted |
|2023-07-19 18:05:07 |6 |1015 |accepted |
Limit Reached
Query terminated
Note the use of the TIMESTAMPTOSTRING
function to return the timestamp of the message (ROWTIME
) as a human-readable string.
Now let’s see how you can generate streams of data that is related and consistent. You are going to use bundled schemas available in the Kafka Connect DatagenConnector
.
Consider an example where we want to generate some dummy data for tracking sensors on a fleet of delivery trucks. There will be two streams of data: (1) the current location of each truck, and (2) truck sensor information from each truck. The first stream representing the location of the trucks would contain the following fields:
→ Vehicle id (primary key / unique identifier)
→ Current location of the truck expressed in longitude and latitude
The second stream will contain sensor readings attached to the truck. Now instead of generating lots of random sensor data, we want a limited set so that it’s more realistic, tying it back to a fixed set of imaginary trucks. Each sensor reading will provide the following information:
→ Vehicle id (primary key / unique identifier)
→ Engine Temerature
→ Average RPM
For building the two related streams you’ll need to create two datagen connectors.
Run the following to create the fleet managment location datagen connector:
CREATE SOURCE CONNECTOR IF NOT EXISTS FLEET_LOCATION WITH (
'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector',
'quickstart' = 'fleet_mgmt_location',
'schema.keyfield' = 'vehicle_id',
'kafka.topic' = 'fleet_mgmt_location',
'key.converter' = 'org.apache.kafka.connect.converters.IntegerConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter.schemas.enable' = 'false',
'max.interval' = 500,
'tasks.max' = '1'
);
Then to create the fleet management sensor reading connector you’ll execute the following:
CREATE SOURCE CONNECTOR IF NOT EXISTS FLEET_SENSORS WITH (
'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector',
'quickstart' = 'fleet_mgmt_sensors',
'schema.keyfield' = 'vehicle_id',
'kafka.topic' = 'fleet_mgmt_sensors',
'key.converter' = 'org.apache.kafka.connect.converters.IntegerConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter.schemas.enable' = 'false',
'max.interval' = 500,
'tasks.max' = '1'
);
Check that the connector is running:
SHOW CONNECTORS;
You should see that the state is RUNNING
:
Connector Name | Type | Class | Status
-------------------------------------------------------------------------------------------------------------
FLEET_LOCATION | SOURCE | io.confluent.kafka.connect.datagen.DatagenConnector | RUNNING (1/1 tasks RUNNING)
FLEET_SENSORS | SOURCE | io.confluent.kafka.connect.datagen.DatagenConnector | RUNNING (1/1 tasks RUNNING)
PIZZA_ORDERS | SOURCE | io.confluent.kafka.connect.datagen.DatagenConnector | RUNNING (1/1 tasks RUNNING)
-------------------------------------------------------------------------------------------------------------
With the connector running let’s see an example of how we can use this test data.
First, declare some ksqlDB streams over the data and see how we can join them.
First, we declare a stream over the truck locations (locations
), and another stream over the sensor data (sensor_readings
)
CREATE STREAM LOCATIONS WITH (KAFKA_TOPIC='fleet_mgmt_location', VALUE_FORMAT='AVRO');
CREATE STREAM SENSOR_READINGS WITH (KAFKA_TOPIC='fleet_mgmt_sensors', VALUE_FORMAT='AVRO');
Now we can join the traffic to find out which device it was associated with
SELECT TIMESTAMPTOSTRING(L.ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
LOCATION,
ENGINE_TEMPERATURE as TEMP,
AVERAGE_RPM
FROM LOCATIONS L
INNER JOIN SENSOR_READINGS SR WITHIN 1 HOUR ON L.VEHICLE_ID=SR.VEHICLE_ID
EMIT CHANGES LIMIT 10;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|EVENT_TS |LOCATION |TEMP |AVERAGE_RPM |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|2023-07-19 22:23:36 |{LATITUDE=0.5914727556445234, LONGITUDE=0.|182 |2081 |
| |9794271551735891} | | |
|2023-07-19 22:24:03 |{LATITUDE=0.49069154234746304, LONGITUDE=0|165 |2104 |
| |.7837095230088684} | | |
|2023-07-19 22:23:44 |{LATITUDE=0.9937373817480233, LONGITUDE=0.|161 |2010 |
| |7971831112471285} | | |
|2023-07-19 22:24:40 |{LATITUDE=0.2285635326946317, LONGITUDE=0.|164 |4676 |
| |9023151261916681} | | |
|2023-07-19 22:23:45 |{LATITUDE=0.5572288107918144, LONGITUDE=0.|157 |3356 |
| |1988305201405317} | | |
|2023-07-19 22:23:43 |{LATITUDE=0.9923908331905874, LONGITUDE=0.|194 |2832 |
| |15492182582850067} | | |
|2023-07-19 22:23:47 |{LATITUDE=0.06568889751126361, LONGITUDE=0|167 |2672 |
| |.6247355244049261} | | |
|2023-07-19 22:24:28 |{LATITUDE=0.5549595420013878, LONGITUDE=0.|199 |4860 |
| |09547171496428886} | | |
|2023-07-19 22:24:22 |{LATITUDE=0.06674334607324772, LONGITUDE=0|173 |1801 |
| |.8736596195706143} | | |
|2023-07-19 22:24:01 |{LATITUDE=0.9470024414548411, LONGITUDE=0.|234 |4563 |
| |3929296524788367} | | |
Limit Reached
Query terminated
Shut down the stack by running:
docker compose down