Generate complex streams of test data

Question:

How can you generate realistic test data in Kafka?

Edit this page

Example use case:

Perhaps you are building an application, or constructing a pipeline, and you would like some mock data to use in testing. Using this connector, you can generate realistic test data that can also be made referentially consistent across topics.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir generate-test-data-streams && cd generate-test-data-streams

Then make the following directories to set up its structure:

mkdir src 

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud). Make sure that you create this file in the same place as the cities.sql file that you created above.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.28.2
    hostname: ksqldb
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
      # KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/connect/connect.properties"
      # Kafka Connect config below
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java,/data/connect-jars'
    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -lc
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt --component-dir /usr/share/java/ mdrogalis/voluble:0.3.0
        #
        echo "Launching ksqlDB"
        /usr/bin/docker/run &
        #
        sleep infinity

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.28.2
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
    volumes:
      - ./src:/opt/app/src

  # kcat:
  #   image: edenhill/kcat:1.7.1
  #   container_name: kcat
  #   links:
  #     - broker
  #   entrypoint:
  #     - /bin/sh
  #     - -c
  #     - |
  #       apk add jq;
  #       while [ 1 -eq 1 ];do sleep 60;done

Now launch Confluent Platform by running:

docker compose up -d

Before continuing, you need to wait for all the containers to fully start up. Run this script which will wait for the final dependencies in the chain to be ready:

#!/bin/bash

# Wait for Schema Registry to become available
while :
  do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8081)
  echo -e $(date) " Component: Schema Registry\t\tHTTP state: " $curl_status "\t(waiting for 200)"
  if [ $curl_status -eq 200 ]
    then
      echo "✅✅ Schema Registry is ready"
      break
  fi
  sleep 5
done

# Wait for ksqlDB to become available
while :
  do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8088/info)
  echo -e $(date) " Component: ksqlDB \t\t\tHTTP state: " $curl_status "\t(waiting for 200)"
  if [ $curl_status -eq 200 ]
    then
      echo "✅✅ ksqlDB is ready"
      break
  fi
  sleep 5
done

Once everything is ready you should see this, and get control back at your terminal:

Wed 1 Apr 2020 11:58:49 BST  Component: Schema Registry         HTTP state:  200        (waiting for 200)
✅ Schema Registry is ready
Wed 1 Apr 2020 11:58:49 BST  Component: ksqlDB                  HTTP state:  200        (waiting for 200)
✅ ksqlDB is ready

Once the stack has started, run the following command to ensure that the connector plugin that we’re going to be using has been loaded.

docker exec -i ksqldb curl -s localhost:8083/connector-plugins|jq '.[].class'

You should see this output

"io.mdrogalis.voluble.VolubleSourceConnector"

Create a standalone stream of test data

4

Launch the ksqlDB CLI:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

From the ksqlDB prompt you can create the data generator, which is available as a source connector.

To start with, let’s create a connector populating a single topic with some random purchase transactions

CREATE SOURCE CONNECTOR IF NOT EXISTS CLICKS WITH (
    'connector.class'             = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'key.converter'               = 'org.apache.kafka.connect.storage.StringConverter',
    'genkp.clicks.with'           = '#{Number.randomDigit}',
    'attrkp.clicks.null.rate'     = 1,
    'genv.clicks.source_ip.with'  = '#{Internet.ipV4Address}',
    'genv.clicks.host.with'       = '#{Internet.url}',
    'genv.clicks.path.with'       = '#{File.fileName}',
    'genv.clicks.user_agent.with' = '#{Internet.userAgentAny}',
    'topic.clicks.throttle.ms'    = 1000
);

Check that the connector is running:

SHOW CONNECTORS;

You should see that the state is RUNNING:

 Connector Name | Type   | Class                                       | Status
-----------------------------------------------------------------------------------------------------
 CLICKS         | SOURCE | io.mdrogalis.voluble.VolubleSourceConnector | RUNNING (1/1 tasks RUNNING)
-----------------------------------------------------------------------------------------------------

You can also inspect further details about the connector:

DESCRIBE CONNECTOR CLICKS;
Name                 : CLICKS
Class                : io.mdrogalis.voluble.VolubleSourceConnector
Type                 : source
State                : RUNNING
WorkerId             : ksqldb:8083

 Task ID | State   | Error Trace
---------------------------------
 0       | RUNNING |
---------------------------------

Consume events from the test topic

5

With the connector running let’s now inspect the data that’s being generated to the Kafka topic. ksqlDB’s PRINT command will show the contents of a topic:

PRINT clicks LIMIT 5;

You should see 5 messages and then the command will exit. Observe that the one message is being generated per second, which is what was specified in the 'topic.clicks.throttle.ms' = 1000 setting.

Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 4/2/20 1:07:00 PM UTC, key: <null>, value: {"user_agent": "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.11) Gecko GranParadiso/3.0.11", "source_ip": "226.38.125.119", "path": "est_laboriosam/voluptate.webm", "host": "www.katlyn-balistreri.info"}
rowtime: 4/2/20 1:07:01 PM UTC, key: <null>, value: {"user_agent": "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_6; en-en) AppleWebKit/533.19.4 (KHTML, like Gecko) Version/5.0.3 Safari/533.19.4", "source_ip": "93.37.84.76", "path": "et_rerum/et.js", "host": "www.rebecka-bogisich.org"}
rowtime: 4/2/20 1:07:02 PM UTC, key: <null>, value: {"user_agent": "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0)", "source_ip": "142.81.110.151", "path": "non_quisquam/qui.html", "host": "www.marilou-vandervort.biz"}
rowtime: 4/2/20 1:07:03 PM UTC, key: <null>, value: {"user_agent": "Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts)", "source_ip": "7.195.71.80", "path": "error_voluptas/voluptate.avi", "host": "www.lorenzo-swift.com"}
rowtime: 4/2/20 1:07:04 PM UTC, key: <null>, value: {"user_agent": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36", "source_ip": "105.197.34.118", "path": "ipsam_est/quibusdam.ods", "host": "www.penni-pacocha.com"}
Topic printing ceased

You can also run this without the LIMIT 5 to see a continuous stream of messages. If you do this, press Ctrl+C to return to the ksqlDB prompt.

Declare the topic as a ksqlDB stream

6

The connector that we’ve created populates a Kafka topic. As such, we can consume from it using any Kafka consumer. Since we’re in ksqlDB already let’s declare a stream over it so that we can project and filter on the data being generated.

Note that because we’re using Avro the schema is picked up automagically from the Schema Registry.

CREATE STREAM CLICKS WITH (KAFKA_TOPIC='clicks', VALUE_FORMAT='AVRO');

Use the DESCRIBE command to inspect the schema of the stream that’s been created:

DESCRIBE CLICKS;
Name                 : CLICKS
 Field      | Type
----------------------------------------
 USER_AGENT | VARCHAR(STRING)
 SOURCE_IP  | VARCHAR(STRING)
 PATH       | VARCHAR(STRING)
 HOST       | VARCHAR(STRING)
----------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

With this stream object created, we can query it using standard SQL constructs such as selecting fields and applying predicates:

SET 'auto.offset.reset' = 'earliest';

SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS CLICK_TS,
       SOURCE_IP,
       HOST,
       PATH
  FROM CLICKS
 WHERE USER_AGENT LIKE 'Moz%'
 EMIT CHANGES
 LIMIT 5;
+---------------------+----------------+--------------------------+---------------------------+
|CLICK_TS             |SOURCE_IP       |HOST                      |PATH                       |
+---------------------+----------------+--------------------------+---------------------------+
|2020-04-01 12:10:35  |126.112.211.195 |www.aleisha-swaniawski.co |voluptas_illo/aliquid.xls  |
|2020-04-01 12:10:38  |115.176.153.82  |www.sanjuana-mitchell.io  |consectetur_ea/velit.html  |
|2020-04-01 12:11:13  |90.126.37.217   |www.lonnie-medhurst.name  |corporis_aut/sed.xls       |
|2020-04-01 12:11:38  |126.213.246.181 |www.nell-quitzon.co       |est_in/accusamus.pages     |
|2020-04-01 12:11:54  |89.174.178.212  |www.matt-tromp.org        |commodi_ut/libero.css      |
Limit Reached
Query terminated

Note the use of the TIMESTAMPTOSTRING function to return the timestamp of the message (ROWTIME) as a human-readable string.

7

Now let’s see how you can generate streams of data that is related and consistent.

Consider an example where we want to generate some dummy data about network devices. The stream of events would have fields such as

→ The MAC address of the device

→ The number of bytes sent

Now let’s imagine that instead of generating lots of random MAC addresses, we want a limited set so that the data is more realistic, tying it back to a fixed set of imaginary devices. Each device will have some characteristics:

→ MAC address (primary key / unique identifier)

→ Device name

→ Location

Run the following to create the connector:

CREATE SOURCE CONNECTOR IF NOT EXISTS NETWORK_TRAFFIC WITH (
    'connector.class'                     = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
    'genkp.devices.with'                  = '#{Internet.macAddress}',
    'genv.devices.name.with'              = '#{GameOfThrones.dragon}',
    'genv.devices.location->city.with'    = '#{Address.city}',
    'genv.devices.location->country.with' = '#{Address.country}',
    'topic.devices.records.exactly'       = 10,
    'genkp.traffic.with'                  = '#{Number.randomDigit}',
    'attrkp.traffic.null.rate'            = 1,
    'genv.traffic.mac.matching'           = 'devices.key',
    'genv.traffic.bytes_sent.with'        = '#{Number.numberBetween ''64'',''4096''}',
    'topic.traffic.throttle.ms'           = 500
);

The connector settings can be understood as follows:

  • Devices

genkp sets the macAddress as the primitive key

→ The city and country are set as nested values in the location entity

records.exactly limits the number of records produced to 10

  • Traffic

→ Not all messages in Kafka are keyed, and this shows an example of generating null keys by setting the null rate to 100% (null.rate has a value between 0 and 1).

'genkp.traffic.with'                = '#{Number.randomDigit}',
'attrkp.traffic.null.rate'           = 1,

→ The MAC address of the device is set as one of the existing keys on devices

→ The rate at which messages are produced is limited to two per second (throttle.ms' = 500)

Join the test data streams in ksqlDB

8

Check that the connector is running:

SHOW CONNECTORS;

You should see that the state is RUNNING:

 Connector Name | Type   | Class                                       | Status
-----------------------------------------------------------------------------------------------------
 CLICKS          | SOURCE | io.mdrogalis.voluble.VolubleSourceConnector | RUNNING (1/1 tasks RUNNING)
 NETWORK_TRAFFIC | SOURCE | io.mdrogalis.voluble.VolubleSourceConnector | RUNNING (1/1 tasks RUNNING)
-----------------------------------------------------------------------------------------------------

With the connector running let’s see an example of how we can use this test data.

First, declare some ksqlDB streams and tables over the data and see how we can join them.

First, we declare a stream over the events (traffic), and a table over the entity reference data (devices)

CREATE STREAM TRAFFIC WITH (KAFKA_TOPIC='traffic', VALUE_FORMAT='AVRO');
CREATE TABLE DEVICES (ID VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='devices', VALUE_FORMAT='AVRO');

Now we can join the traffic to find out which device it was associated with

SELECT TIMESTAMPTOSTRING(T.ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
       MAC,
       BYTES_SENT,
       NAME,
       LOCATION
  FROM TRAFFIC T
         LEFT JOIN
       DEVICES D
         ON T.MAC=D.ID
EMIT CHANGES LIMIT 10;
+--------------------+------------------+------------+----------+----------------------------------------------------+
|EVENT_TS            |MAC               |BYTES_SENT  |NAME      |LOCATION                                            |
+--------------------+------------------+------------+----------+----------------------------------------------------+
|2020-04-02 12:34:32 |07:71:11:20:14:d3 |143         |Drogon    |{CITY=Charmainehaven, COUNTRY=Anguilla}             |
|2020-04-02 12:34:33 |e6:ee:92:de:bb:08 |2352        |Ghiscar   |{CITY=Gibsonbury, COUNTRY=Uzbekistan}               |
|2020-04-02 12:34:33 |ce:d8:35:2c:ea:28 |398         |Valryon   |{CITY=Moenmouth, COUNTRY=Saint Pierre and Miquelon} |
|2020-04-02 12:34:34 |e6:ee:92:de:bb:08 |3785        |Ghiscar   |{CITY=Gibsonbury, COUNTRY=Uzbekistan}               |
|2020-04-02 12:34:34 |ba:0f:84:52:54:95 |1670        |Vhagar    |{CITY=Lake Takakobury, COUNTRY=Ecuador}             |
|2020-04-02 12:34:35 |53:c8:f3:5d:ff:29 |1162        |Rhaegal   |{CITY=Graigtown, COUNTRY=Netherlands Antilles}      |
|2020-04-02 12:34:35 |e6:ee:92:de:bb:08 |749         |Ghiscar   |{CITY=Gibsonbury, COUNTRY=Uzbekistan}               |
|2020-04-02 12:34:36 |e5:b2:77:11:74:81 |182         |Vhagar    |{CITY=Reillyshire, COUNTRY=Armenia}                 |
|2020-04-02 12:34:36 |ba:0f:84:52:54:95 |974         |Vhagar    |{CITY=Lake Takakobury, COUNTRY=Ecuador}             |
|2020-04-02 12:34:37 |07:71:11:20:14:d3 |1878        |Drogon    |{CITY=Charmainehaven, COUNTRY=Anguilla}             |
Limit Reached
Query terminated

Clean up

9

Shut down the stack by running:

docker compose down

Take it to production

Write your statements to a file

1

Now that you have a series of statements that’s doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at src/statements.sql with the following content:

CREATE SOURCE CONNECTOR IF NOT EXISTS CLICKS WITH (
    'connector.class'             = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'key.converter'               = 'org.apache.kafka.connect.storage.StringConverter',
    'genkp.clicks.with'           = '#{Number.randomDigit}',
    'attrkp.clicks.null.rate'     = 1,
    'genv.clicks.source_ip.with'  = '#{Internet.ipV4Address}',
    'genv.clicks.host.with'       = '#{Internet.url}',
    'genv.clicks.path.with'       = '#{File.fileName}',
    'genv.clicks.user_agent.with' = '#{Internet.userAgentAny}',
    'topic.clicks.throttle.ms'    = 1000
);

CREATE SOURCE CONNECTOR IF NOT EXISTS NETWORK_TRAFFIC WITH (
    'connector.class'                     = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
    'genkp.devices.with'                  = '#{Internet.macAddress}',
    'genv.devices.name.with'              = '#{GameOfThrones.dragon}',
    'genv.devices.location->city.with'    = '#{Address.city}',
    'genv.devices.location->country.with' = '#{Address.country}',
    'topic.devices.records.exactly'       = 10,
    'genkp.traffic.with'                  = '#{Number.randomDigit}',
    'attrkp.traffic.null.rate'            = 1,
    'genv.traffic.mac.matching'           = 'devices.key',
    'genv.traffic.bytes_sent.with'        = '#{Number.numberBetween ''64'',''4096''}',
    'topic.traffic.throttle.ms'           = 500
);

Send the statements to the REST endpoint

2

Launch your statements into production by sending them to the REST API with the following command:

tr '\n' ' ' < src/statements.sql | \
sed 's/;/;\'$'\n''/g' | \
while read stmt; do
    echo '{"ksql":"'$stmt'", "streamsProperties": {}}' | \
        curl -s -X "POST" "http://localhost:8088/ksql" \
             -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
             -d @- | \
        jq '.'
done

Validate the deployment

3

Confirm that the deploy worked:

echo '{"ksql":"SHOW CONNECTORS;", "streamsProperties": {}}' | \
        curl -s -X "POST" "http://localhost:8088/ksql" \
             -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
             -d @-

You should see the two connectors listed:

[{"@type":"connector_list","statementText":"SHOW CONNECTORS;","warnings":[],"connectors":[{"name":"CLICKS","type":"source","className":"io.mdrogalis.voluble.VolubleSourceConnector","state":"RUNNING (1/1 tasks RUNNING)"},{"name":"NETWORK_TRAFFIC","type":"source","className":"io.mdrogalis.voluble.VolubleSourceConnector","state":"RUNNING (1/1 tasks RUNNING)"}]}]