How can you produce mock data to Kafka topics to test your Kafka applications?
To get started, make a new directory anywhere you’d like for this project:
mkdir kafka-connect-datagen && cd kafka-connect-datagen
Next, create a directory for configuration data:
mkdir configuration
This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.
After you log in to Confluent Cloud, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
From the Confluent Cloud Console, navigate to your Kafka cluster and then select CLI and Tools
in the lefthand navigation. Click the CLI Tools
header and get the connection information customized to your cluster.
Create new credentials for your Kafka cluster and Schema Registry, writing in appropriate descriptions so that the keys are easy to find and delete later. The Confluent Cloud Console will show a configuration similar to below with your new credentials automatically populated (make sure Show API keys
is checked).
Copy and paste it into a configuration/ccloud.properties
file on your machine.
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials. |
This tutorial has some steps for Kafka topic management and producing and consuming events, for which you can use the Confluent Cloud Console or the Confluent CLI. Follow the instructions here to install the Confluent CLI, and then follow these steps connect the CLI to your Confluent Cloud cluster.
Create a Kafka topic called mytopic
in Confluent Cloud.
confluent kafka topic create mytopic
This should yield the following output:
Created topic "mytopic".
You can provision the Kafka Connect Datagen connector through the Confluent Cloud Console, but in this tutorial you can choose to use the Confluent CLI or the Confluent Cloud REST API.
First, create a file called datagen-source-config.json
with the below connector configuration for the Kafka Connect Datagen source connector for Confluent Cloud.
Substitute <CLUSTER_API_KEY>
and <CLUSTER_API_SECRET>
with the credentials from the configuration/ccloud.properties
file.
In this sample configuration, the connector uses the PAGEVIEWS quickstart to produce JSON records simulating website pageviews. The records will be formatted with a schema specification called PAGEVIEWS to a Kafka topic called mytopic
.
For a full explanation of all connector configuration parameters, see documentation.
{
"name" : "datagen_ccloud_01",
"connector.class": "DatagenSource",
"kafka.api.key": "<CLUSTER_API_KEY>",
"kafka.api.secret" : "<CLUSTER_API_SECRET>",
"kafka.topic" : "mytopic",
"output.data.format" : "JSON",
"quickstart" : "PAGEVIEWS",
"tasks.max" : "1"
}
You can choose one of two options for provisioning a fully managed Kafka Connect Datagen source connector in Confluent Cloud. Either option will use the connector configuration file datagen-source-config.json
that you created in the previous step.
Option 1. You can use the Confluent CLI which provides the confluent connector create
command allowing you to pass in the configuration file from the previous step.
confluent connector create --config datagen-source-config.json
Option 2. The Confluent Cloud REST API can provision the connector using the configuration file you created from the previous step. This API requires we provide a Confluent Cloud resource ID for both the environment and Kafka cluster we wish to deploy the connector to. These values can be obtained by using the Confluent Cloud Console or using the confluent kafka cluster describe
command.
Additionally, we must provide an API Key and Secret which authorizes us to control our cloud account. This API key is independent of the one you use to connect to Kafka or the Schema Registry, so we need to generate it before the HTTP command will succeed.
confluent api-key create --resource cloud -o json > cloud-api-key.json
The cloud-api-key.json
file now contains an API key and secret authorized to control your cloud account. Protect this file as you would any secret value.
You will also need to set:
Confluent Cloud environment id into the configuration parameter ENVIRONMENT
(use the command confluent environment list
to view the active environment).
Kafka cluster id into the configuration parameter CLUSTER
(use the command confluent kafka cluster list
to view the active cluster).
Run the following curl
command to provision the connector. The command will read the API key and secret from the cloud-api-key.json
file (using the jq dev tool) and PUT
the new connector config to the REST API in the appropriate environment and Kafka cluster.
curl -XPUT -H 'Content-Type: application/json' --data "@datagen-source-config.json" --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/config
To check the status of the connector from the command line, you have the same two options as provisioning.
Option 1. Using the ccloud
CLI.
confluent connector list
Rerun this command to get the updated Status, it will change from PROVISIONING
to RUNNING
when the Connector is ready.
ID | Name | Status | Type | Trace
+-----------+-------------------+---------+--------+-------+
lcc-6g1p6 | datagen_ccloud_01 | RUNNING | source |
Option 2. The Confluent Cloud REST API provides a connector_name/status
endpoint you can use to verify the status of a provisioned connector. Note the connector.state
field in the returned JSON.
As described in Step 7 above, an API Key is required for all REST API calls to succeed.
curl -s -XGET -H 'Content-Type: application/json' --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/status | jq
{
"name": "datagen_ccloud_01",
"connector": {
"state": "RUNNING",
"worker_id": "datagen_ccloud_01",
"trace": ""
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "datagen_ccloud_01",
"msg": ""
}
],
"type": "source"
}
Now that the Kafka Connect Datagen is running in Confluent Cloud, it is producing messages to your Kafka topic. View the messages being produced to the Kafka topic in Confluent Cloud.
There are many ways to do this, including the Confluent Cloud Console, but for this tutorial we will show you how to it with the Confluent CLI.
confluent kafka topic consume mytopic --print-key
After the consumer starts, you should see the following output in a few seconds:
2871 {"viewtime":2871,"userid":"User_6","pageid":"Page_34"}
2881 {"viewtime":2881,"userid":"User_3","pageid":"Page_16"}
2901 {"viewtime":2901,"userid":"User_2","pageid":"Page_44"}
2961 {"viewtime":2961,"userid":"User_7","pageid":"Page_97"}
2971 {"viewtime":2971,"userid":"User_1","pageid":"Page_54"}
3151 {"viewtime":3151,"userid":"User_3","pageid":"Page_21"}
3171 {"viewtime":3171,"userid":"User_5","pageid":"Page_65"}
3271 {"viewtime":3271,"userid":"User_3","pageid":"Page_85"}
3361 {"viewtime":3361,"userid":"User_9","pageid":"Page_41"}
3421 {"viewtime":3421,"userid":"User_3","pageid":"Page_60"}
3431 {"viewtime":3431,"userid":"User_7","pageid":"Page_57"}
3501 {"viewtime":3501,"userid":"User_3","pageid":"Page_52"}
When you are done, type Ctrl-C
.
Because fully managed connectors in Confluent Cloud may be billed hourly, it’s a good idea to delete the connector when you are done with this tutorial.
Run the following Confluent CLI command to list the provisioned connectors. Find the datagen connector’s id.
confluent connector list
Delete the connector. Verify the deletion in Confluent Cloud Console.
confluent connector delete <connector id>