Senior Developer Advocate (Presenter)
The confluent command line interface (CLI) is a convenient tool that enables developers to manage both Confluent Cloud and Confluent Platform. Built-in autocompletion is a convenient feature to help you quickly write commands. And with authentication and machine readable output, the CLI supports automated workflows as well.
The primary goal of this exercise is to demonstrate various confluent CLI commands that can be used to create, configure, and monitor Confluent managed connectors running in Confluent Cloud.
In order to do this exercise using the Confluent Cloud Connector CLI, you first need to do a bit of preparation to make sure you have all of the tools you need. Complete the following steps to set up your environment. Prior to doing so, you will need to sign up for Confluent Cloud.
Note: Steps 1-15 are the same as those included in 6. Hands On: Confluent Cloud Managed Connector API. If you already completed them for that exercise, you can skip to step 16 of this exercise setup.
We will use various CLI during the course exercises including confluent so these need to be available on the machine you plan to run the exercise on. Downloading Confluent Platform will accomplish this.
cd ~ && \
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz && \
tar xzf confluent-7.1.1.tar.gz && \
echo "export PATH=$HOME/confluent-7.1.1/bin/:$PATH" >> ~/.bashrc && \
export PATH=$HOME/confluent-7.1.1/bin/:$PATH
We’ll use a useful bash library of functions for interacting with Confluent Cloud. Note that this library is community supported and not supported by Confluent. Download the library by running the following:
curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
chmod +x ccloud_library.sh
Once that’s installed, we’ll clone the GitHub repository that contains the files needed to run the exercises for the Kafka Connect 101 course.
git clone \
https://github.com/confluentinc/learn-kafka-courses.git \
~/learn-kafka-courses
Let’s now log in to the confluent CLI.
confluent login --save
When using the confluent CLI, you usually need to specify your environment and cluster for every command. To make our lives easier, we can set default parameters so that we won’t have to specify them for each individual command. Let’s start with the Confluent Cloud environment ID. Let’s list the available environments for our org.
confluent environment list
We will use the default environment so let’s set it as the default.
confluent environment use <default env ID>
Next, let’s create a Confluent Cloud cluster to use with the exercise.
confluent kafka cluster create kc-101 \
--cloud gcp \
--region us-west4 \
--type basic
Note: The kc-101 cluster may already exist. If so, run the following command to identify its resource ID which is needed for the step that follows: confluent kafka cluster list
Now let’s set it as the default cluster for the confluent CLI.
confluent kafka cluster use <cluster ID>
The Confluent CLI needs to authenticate with Confluent Cloud using an API key and secret that has the required privileges for the cluster. Let’s create these now. The Confluent CLI will automatically save them in ~/.confluent/config.json making them available for use by the CLI.
confluent api-key create --resource <cluster ID>
Let’s now set this API key and secret as the default if it isn’t otherwise specified in Confluent CLI commands.
confluent api-key use <cluster API key> --resource <cluster ID>
We also need to enable Schema Registry for our Confluent Cloud environment. Let’s do this now.
Note: If Schema Registry has already been enabled for the environment, the command will return the existing cluster details.
confluent schema-registry cluster enable --cloud gcp --geo us
The confluent CLI needs to also authenticate with the Schema Registry using an API key and secret. Let’s create these now. The confluent CLI will also automatically save them in ~/.confluent/config.json making them available for use by the CLI.
confluent api-key create --resource <SR cluster ID>
In addition to the confluent CLI, we will also be using Kafka clients that will need to authenticate with the cluster and Schema Registry using the API keys and secrets. We can create a client configuration file using the confluent CLI. The command will automatically obtain the cluster API key and secret from ~/.confluent/config.json but we need to specify the SR API key and secret using the --sr-apikey and --sr-apisecret parameters.
confluent kafka client-config create java --sr-apikey <sr API key> --sr-apisecret <sr API secret> | tee $HOME/.confluent/java.config
Required parameters for cluster API REST calls include the cluster API key and secret. The java.config file that was created in the previous step contains the cluster API key and secret. The ccloud_library.sh script auto-generates configuration files for downstream clients using as input the java.config file. One of the output files is delta_configs/env.delta which contains commands that establish environment variables equal to the cluster.
Let’s generate these files now.
source ccloud_library.sh
ccloud::generate_configs $HOME/.confluent/java.config
And now we will establish the environment variables for our current command shell.
source delta_configs/env.delta
And finally, let’s verify the previous step was successful.
printenv
You should see in the command output the environment variables contained in env.delta have been established.
During this exercise we will be streaming data to a MySQL database running on the local machine in a Docker container. The associated docker-compose.yml file is located in the learn-kafka-courses/kafka-connect-101 directory. We will now start this Docker container.
cd ~/learn-kafka-courses/kafka-connect-101 && \
cp mysql-only.yml docker-compose.yml && \
docker-compose up -d
Alright, with that, our environment preparation is complete. We know that was quite a bit of work, but now we’re ready to get started with this and all of the following exercises. Let’s dive in!
We will start by logging into Confluent Cloud—and note that we’re using the --save flag so that our credentials will be used for subsequent commands in this exercise. They’ll remain active until you run the logout command.
confluent login --save
Enter email and password
Let’s start by setting the active environment and cluster that CLI commands apply to.
First, let’s obtain the environment ID.
confluent environment list
In the Confluent org being used for this demo, there is a single environment and its name is default.
Let’s set it as the active environment for the confluent CLI command.
confluent environment use <default env ID>
Next, we’ll set the default cluster to use; let’s obtain the cluster ID.
confluent kafka cluster list
In the Confluent org being used for this demo, there is a single cluster and its name is kc-101.
Let’s set it as the active cluster for the confluent CLI command.
confluent kafka cluster use <kc-101 cluster ID>
Let’s now create a new Kafka topic named transactions. We will set this as the target for the Datagen source connector.
confluent kafka topic create transactions
Notice the topic was created with default values of 6 partitions and a replication factor of 3.
Before proceeding, let’s verify the transactions topic was created successfully.
confluent kafka topic list
Before we create the DatagenSource connector instance, let’s list the fully managed connector plugins that are available for streaming with our Confluent Cloud environment.
Note: This list may be a little different depending upon what cloud provider Confluent Cloud is running in.
confluent connect plugin list
As you can see, the list is quite long and includes the DatagenSource connector. Next, let’s create a DatagenSource connector instance.
First, we need to update the file containing the connector instance configuration. In this demo we will use VSCode.
Note: This file was created on your behalf and included in the GitHub repo that was cloned earlier during the environment setup steps.
code ~/learn-kafka-courses/kafka-connect-101
Note: This file was created as part of the environment set up steps.
On line 8, copy the value assigned to CLOUD_KEY.
In VSCode, locate and open file datagen-source-config.json.
On line 7, replace <key> with the copied value assigned to CLOUD_KEY.
Return to file env.delta and on line 9, copy the value assigned to CLOUD_SECRET.
Return to file datagen-source-config.json and on line 8, replace <secret> with the copied value assigned to CLOUD_SECRET.
Save file datagen-source-config.json and close VSCode.
We can now create the DatagenSource managed connector instance.
confluent connect create --config datagen-source-config.json
To verify the connector instance’s status, let’s first list all connector instances in the cluster.
confluent connect list
The DatagenSource connector instance appears in the list with a status of Provisioning. This is expected as it takes a moment for the connector instance to be fully provisioned and running. We need to repeat this command periodically until we see the status has changed to Running before we continue.
Using the connector instance ID that was included in the list command output, let’s use the describe option to obtain additional details about the connector instance.
confluent connect describe <connector ID>
Next, let’s consume records from the transactions topic to verify sample data is being produced.
confluent kafka topic consume -b transactions \
--value-format avro \
--api-key $CLOUD_KEY \
--api-secret $CLOUD_SECRET \
--sr-endpoint $SCHEMA_REGISTRY_URL \
--sr-api-key <SR key> \
--sr-api-secret <SR secret>
Note: You will need to replace <SR key> and <SR secret> with their respective values which you can find on line 18 in ~/.confluent/java.config in the form of basic.auth.user.info=<SR key>:<SR secret>
We should now have sufficient sample data in the transactions topic. So that we don’t unnecessarily exhaust any Confluent Cloud promotional credits, let’s delete the DatagenSource connector instance.
confluent connect delete <connector ID>
Let’s now establish the downstream side of our data pipeline. We will use the MySQLSink connector for this. It will consume records from the transactions topic and write them out to a corresponding table in our MySQL database that is running in the local Docker container that we started during the exercise environment setup steps.
First we need to update the file containing the connector instance configuration.
Note: This file was created on your behalf and included in the GitHub repo that was cloned earlier during the environment setup steps.
code ~/learn-kafka-courses/kafka-connect-101
Note: This file was created as part of the environment set up steps.
On line 8, copy the value assigned to CLOUD_KEY.
In VSCode, locate and open file__ mysql-sink-config.json__.
On line 8, replace <key> with the copied value assigned to CLOUD_KEY.
Return to file env.delta and on line 9, copy the value assigned to CLOUD_SECRET.
Return to file datagen-source-config.json and on line 9, replace <secret> with the copied value assigned to CLOUD_SECRET.
On line 10, replace <mysql-host-endpoint> with the public endpoint of the host where the MySql database is running.
Notice that ssl.mode is set to prefer. This tells Confluent Cloud to connect using TLS if the destination hose is set up to do so. Otherwise a PLAINTEXT connection will be established. For this demonstration, the local host is an AWS EC2 instance that does not have TLS set up so the connection will be nonsecure and the sample data will be unencrypted across the wire. In a production environment, we would want to be sure to set up the destination host to support TLS.
Notice also the connection host. This should be set to the address of the host on which the mysql database Docker container was established during the exercise setup steps.. In the case of the demonstration since an EC2 instance was being used, the sample command specifies the public endpoint address assigned to the AWS EC2 instance. This value can be obtained in the AWS console display of the EC2 instance details.
We can now save and close the configuration file.
Let’s now create the MySQL Sink connector instance.
confluent connect create —config mysql-sink-config.json
To verify the connector instance’s status, let’s first list all connector instances in the cluster.
confluent connect list
The MySQL Sink connector instance appears in the list with a status of Provisioning. This is expected as it takes a moment for the connector instance to be fully provisioned and running. We need to repeat this command periodically until we see the status has changed to Running before we continue.
Using the connector instance ID that was included in the list command output, let’s use the describe option to obtain additional details about the connector instance.
confluent connect describe <connector ID>
Next, let’s run a query on the MySQL database to verify the connector has written records to the transactions table.
docker exec -t mysql bash -c 'echo "SELECT * FROM transactions LIMIT 10 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
Success!
Let’s continue now with our tour of using the Confluent CLI with Confluent Cloud managed connectors.
Perhaps we want to pause a connector instance temporarily. Here is the command to do this.
confluent connect pause <connector ID>
Let’s verify both the connector and task are paused using the status command.
confluent connect describe <connector ID>
Confirmed.. Let’s now resume the connector and its task.
confluent connect resume <connector ID>
And let’s verify both the connector and task are once again running.
confluent connect describe <connector ID>
The connector and its task are once again in a Running state.
If you run this demonstration yourself, you need to tear down the environment after doing so to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.
Let’s walk through that tear down process now for this environment.
First we delete MySqlSinkConnector_2.
confluent connect delete <connector ID>
Next we will delete the transactions topic.
confluent kafka topic delete transactions
And finally, we will shut down the mysql Docker container and free its resources.
docker-compose down -v
This concludes this demonstration.
Additional information about the confluent CLI can be found here.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi there, Danica Fine here with another hands-on Kafka Connect exercise. This time we'll be exploring the Confluent command line interface. But before we dive in though, you might need to do a bit of preparation to make sure you have all of the tools that you need. If you don't already have a Confluent Cloud account, make sure to do so now, and also take the time to complete the necessary environment setup steps. The Confluent CLI is a tool that enables developers to manage both Confluent Cloud and Confluent Platform. Built-in autocompletion is a convenient feature that helps you quickly write commands, and with authentication and machine-readable output, the CLI supports automated workflows as well. The primary goal of this exercise is to demonstrate various Confluent CLI commands that can be used to create, configure, and monitor Confluent managed connectors that are running in Confluent Cloud. We'll start by logging into Confluent Cloud, and note that we're using the save flag so that our credentials will be used for subsequent commands in this exercise. They'll remain active until you run the logout command. To make things a little easier moving forward, we'll start off by setting the active environment and cluster to use for subsequent CLI commands. First we'll obtain the environment ID. In the Confluent org being used for this demo, there is a single environment and its name is default. Let's set it as the active environment for the Confluent CLI command. Next we'll set the default cluster to use. Let's obtain the cluster ID. In this demo, there is a single cluster and its name is kc-101. Let's set it as the active cluster for the Confluent CLI command. With that out of the way, we can create a new Kafka topic named transactions which will serve as the target for the Datagen Source connector. Notice the topic was created with default values of six partitions and a replication factor of three. Before proceeding, let's verify the transactions topic was created successfully. Before we create the Datagen Source connector instance, let's also list the fully managed connector plugins that are available for streaming with our Confluent Cloud environment, and note that this list may be a little different depending on which cloud provider Confluent Cloud is running in, so it's always a good idea to check. As you can see, the list is pretty sizable, but it does include the Datagen Source connector. Let's go ahead and create a Datagen Source connector instance. The file containing the connector instance configuration was already pre-created for you and included in the GitHub repo that was cloned earlier during the environment setup steps. All right. Let's create it. To verify the connector instance's status, we list all connector instances in the cluster. The Datagen Source connector instance appears in the list with a status of PROVISIONING. This is expected as it does take a few moments for the connector to be fully provisioned and running. We need to repeat this command periodically until we see the status has changed to RUNNING before we continue. Using the connector instance ID that was included in the list command output, let's use the describe option to obtain additional details about the connector instance. Next, let's consume records from the transactions topic to verify sample data is being produced. After letting it run for a little bit, we should now have sufficient sample data in the transactions topic. So that we don't necessarily exhaust any Confluent Cloud promotional credits, let's delete the Datagen Source connector instance. With the beginning of our data pipeline complete, we can now move on to creating the downstream side of our data pipeline. We'll use the MySQL Sink connector to consume records from the transactions topic and write them out to a corresponding table in our MySQL database that's running in the local Docker container that we started during the exercise environment setup steps. The file containing the connector instance configuration was created for you and included in the GitHub repo that was cloned earlier during the environment setup steps. Looking at the configuration, notice that ssl.mode is set to prefer. This tells Confluent Cloud to connect using TLS if the destination host is set up to do so, otherwise a PLAINTEXT connection will be established. For this demonstration, the local host is an AWS EC2 instance that does not have TLS set up, so the connection will be non-secure and the sample data will be unencrypted across the wire. In a production environment, we would want to be sure to set up the destination host to support TLS. Notice also the connection.host. This is the public endpoint address assigned to the AWS EC2 instance. This value is shown in the AWS console display of the EC2 instance details. All right, let's go ahead and create the MySQL Sink connector instance. To verify the connector instance's status, we should list all connector instances in the cluster. The MySQL Sink connector instance appears in the list with the status of PROVISIONING. This is expected as it does take a moment for the connector instance to be fully provisioned and running. We need to repeat this command periodically until we see the status has changed to RUNNING before we continue. Using the connector instance ID that was included in the list command output, let's use the describe option to obtain additional details about the connector instance. Next, let's run a query on the MySQL database to verify the connector has written records to the transactions table, and it was a success. With the connector created, let's continue with a tour of the Confluent CLI with Confluent Cloud managed connectors. Suppose you wanted to pause a connector instance temporarily. You'd run a command like so. Once that's run, we should verify both the connector and task are paused using this status command, and with that confirmed, we can resume the connector and its task. Again, let's check that the connector and task are once again running. Everything looks good and they're in a running state, so that concludes the tour of the CLI commands. If you run this demonstration yourself, you should tear down the environment to avoid unnecessarily accruing cost to the point your promotional credits are exhausted. Just to be sure, let's walk through that tear down process now for this environment. First, we delete the MySQL Sink connector. Next, we'll delete the transactions topic, and finally, we can shut down the MySQL Docker container and free its resources. And with that taken care of, you completed the exercise and should have a decent idea of the Confluent CLI and its commands.