course: Kafka Connect 101

Hands On: Confluent Cloud Managed Connector CLI

7 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

The confluent command line interface (CLI) is a convenient tool that enables developers to manage both Confluent Cloud and Confluent Platform. Built-in autocompletion is a convenient feature to help you quickly write commands. And with authentication and machine readable output, the CLI supports automated workflows as well.

The primary goal of this exercise is to demonstrate various confluent CLI commands that can be used to create, configure, and monitor Confluent managed connectors running in Confluent Cloud.

Exercise Environment Preparation Steps

In order to do this exercise using the Confluent Cloud Connector CLI, you first need to do a bit of preparation to make sure you have all of the tools you need. Complete the following steps to set up your environment. Prior to doing so, you will need to sign up for Confluent Cloud.

Note: Steps 1-15 are the same as those included in 6. Hands On: Confluent Cloud Managed Connector API. If you already completed them for that exercise, you can skip to step 16 of this exercise setup.

We will use various CLI during the course exercises including confluent so these need to be available on the machine you plan to run the exercise on. Downloading Confluent Platform will accomplish this.

  1. Run command:
cd ~ && \
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz && \
tar xzf confluent-7.1.1.tar.gz && \
echo "export PATH=$HOME/confluent-7.1.1/bin/:$PATH" >> ~/.bashrc && \
export PATH=$HOME/confluent-7.1.1/bin/:$PATH

We’ll use a useful bash library of functions for interacting with Confluent Cloud. Note that this library is community supported and not supported by Confluent. Download the library by running the following:

  1. Run command:
curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
  1. Make ccloud_library.sh executable:
chmod +x ccloud_library.sh

Once that’s installed, we’ll clone the GitHub repository that contains the files needed to run the exercises for the Kafka Connect 101 course.

  1. Run command:
git clone \
https://github.com/confluentinc/learn-kafka-courses.git \
~/learn-kafka-courses

Let’s now log in to the confluent CLI.

  1. In a terminal window, run command:
confluent login --save

When using the confluent CLI, you usually need to specify your environment and cluster for every command. To make our lives easier, we can set default parameters so that we won’t have to specify them for each individual command. Let’s start with the Confluent Cloud environment ID. Let’s list the available environments for our org.

  1. Run command:
confluent environment list

We will use the default environment so let’s set it as the default.

  1. Run command:
confluent environment use <default env ID>

Next, let’s create a Confluent Cloud cluster to use with the exercise.

  1. Run command:
confluent kafka cluster create kc-101 \
    --cloud gcp \
    --region us-west4 \
    --type basic

Note: The kc-101 cluster may already exist. If so, run the following command to identify its resource ID which is needed for the step that follows: confluent kafka cluster list

Now let’s set it as the default cluster for the confluent CLI.

  1. Run command:
confluent kafka cluster use <cluster ID>

The Confluent CLI needs to authenticate with Confluent Cloud using an API key and secret that has the required privileges for the cluster. Let’s create these now. The Confluent CLI will automatically save them in ~/.confluent/config.json making them available for use by the CLI.

  1. Run command:
confluent api-key create  --resource <cluster ID>

Let’s now set this API key and secret as the default if it isn’t otherwise specified in Confluent CLI commands.

  1. Run command:
confluent api-key use <cluster API key> --resource <cluster ID>

We also need to enable Schema Registry for our Confluent Cloud environment. Let’s do this now.

Note: If Schema Registry has already been enabled for the environment, the command will return the existing cluster details.

  1. Run command:
confluent schema-registry cluster enable --cloud gcp --geo us

The confluent CLI needs to also authenticate with the Schema Registry using an API key and secret. Let’s create these now. The confluent CLI will also automatically save them in ~/.confluent/config.json making them available for use by the CLI.

  1. Run command:
confluent api-key create  --resource <SR cluster ID>

In addition to the confluent CLI, we will also be using Kafka clients that will need to authenticate with the cluster and Schema Registry using the API keys and secrets. We can create a client configuration file using the confluent CLI. The command will automatically obtain the cluster API key and secret from ~/.confluent/config.json but we need to specify the SR API key and secret using the --sr-apikey and --sr-apisecret parameters.

  1. Run command:
confluent kafka client-config create java --sr-apikey <sr API key> --sr-apisecret <sr API secret> | tee $HOME/.confluent/java.config

Required parameters for cluster API REST calls include the cluster API key and secret. The java.config file that was created in the previous step contains the cluster API key and secret. The ccloud_library.sh script auto-generates configuration files for downstream clients using as input the java.config file. One of the output files is delta_configs/env.delta which contains commands that establish environment variables equal to the cluster.

Let’s generate these files now.

  1. Run command:
source ccloud_library.sh
ccloud::generate_configs $HOME/.confluent/java.config

And now we will establish the environment variables for our current command shell.

  1. Run command:
source delta_configs/env.delta

And finally, let’s verify the previous step was successful.

  1. Run command:
printenv

You should see in the command output the environment variables contained in env.delta have been established.

During this exercise we will be streaming data to a MySQL database running on the local machine in a Docker container. The associated docker-compose.yml file is located in the learn-kafka-courses/kafka-connect-101 directory. We will now start this Docker container.

  1. Run commands:
cd ~/learn-kafka-courses/kafka-connect-101 && \
cp mysql-only.yml docker-compose.yml && \
docker-compose up -d

Alright, with that, our environment preparation is complete. We know that was quite a bit of work, but now we’re ready to get started with this and all of the following exercises. Let’s dive in!

Confluent Cloud Managed Connector CLI exercise steps

We will start by logging into Confluent Cloud—and note that we’re using the --save flag so that our credentials will be used for subsequent commands in this exercise. They’ll remain active until you run the logout command.

  1. Run command:
confluent login --save 

Enter email and password

Let’s start by setting the active environment and cluster that CLI commands apply to.

First, let’s obtain the environment ID.

  1. Run command:
confluent environment list

In the Confluent org being used for this demo, there is a single environment and its name is default.

Let’s set it as the active environment for the confluent CLI command.

  1. Run command:
confluent environment use <default env ID> 

Next, we’ll set the default cluster to use; let’s obtain the cluster ID.

  1. Run command:
confluent kafka cluster list

In the Confluent org being used for this demo, there is a single cluster and its name is kc-101.

Let’s set it as the active cluster for the confluent CLI command.

  1. Run command:
confluent kafka cluster use <kc-101 cluster ID>

Let’s now create a new Kafka topic named transactions. We will set this as the target for the Datagen source connector.

  1. Run command:
confluent kafka topic create transactions

Notice the topic was created with default values of 6 partitions and a replication factor of 3.

Before proceeding, let’s verify the transactions topic was created successfully.

  1. Run command:
confluent kafka topic list

Stream sample data to a Kafka topic using the DatagenSource connector

Before we create the DatagenSource connector instance, let’s list the fully managed connector plugins that are available for streaming with our Confluent Cloud environment.

Note: This list may be a little different depending upon what cloud provider Confluent Cloud is running in.

  1. Run command:
confluent connect plugin list

As you can see, the list is quite long and includes the DatagenSource connector. Next, let’s create a DatagenSource connector instance.

First, we need to update the file containing the connector instance configuration. In this demo we will use VSCode.

Note: This file was created on your behalf and included in the GitHub repo that was cloned earlier during the environment setup steps.

  1. Run command:
code ~/learn-kafka-courses/kafka-connect-101
  1. In VSCode, navigate to the delta-configs directory and open file env.delta.

Note: This file was created as part of the environment set up steps.

  1. On line 8, copy the value assigned to CLOUD_KEY.

  2. In VSCode, locate and open file datagen-source-config.json.

  3. On line 7, replace <key> with the copied value assigned to CLOUD_KEY.

  4. Return to file env.delta and on line 9, copy the value assigned to CLOUD_SECRET.

  5. Return to file datagen-source-config.json and on line 8, replace <secret> with the copied value assigned to CLOUD_SECRET.

  6. Save file datagen-source-config.json and close VSCode.

We can now create the DatagenSource managed connector instance.

  1. Run command:
confluent connect create --config datagen-source-config.json

To verify the connector instance’s status, let’s first list all connector instances in the cluster.

  1. Run command:
confluent connect list

The DatagenSource connector instance appears in the list with a status of Provisioning. This is expected as it takes a moment for the connector instance to be fully provisioned and running. We need to repeat this command periodically until we see the status has changed to Running before we continue.

Using the connector instance ID that was included in the list command output, let’s use the describe option to obtain additional details about the connector instance.

  1. Run command:
confluent connect describe <connector ID>

Next, let’s consume records from the transactions topic to verify sample data is being produced.

  1. Run command:
confluent kafka topic consume -b transactions \
  --value-format avro \
  --api-key $CLOUD_KEY \
  --api-secret $CLOUD_SECRET \
  --sr-endpoint $SCHEMA_REGISTRY_URL \
  --sr-api-key <SR key> \
  --sr-api-secret <SR secret>

Note: You will need to replace <SR key> and <SR secret> with their respective values which you can find on line 18 in ~/.confluent/java.config in the form of basic.auth.user.info=<SR key>:<SR secret>

  1. Once records begin to appear, press Ctrl+C to end the consumption.

We should now have sufficient sample data in the transactions topic. So that we don’t unnecessarily exhaust any Confluent Cloud promotional credits, let’s delete the DatagenSource connector instance.

  1. Run command:
confluent connect delete <connector ID>

Let’s now establish the downstream side of our data pipeline. We will use the MySQLSink connector for this. It will consume records from the transactions topic and write them out to a corresponding table in our MySQL database that is running in the local Docker container that we started during the exercise environment setup steps.

First we need to update the file containing the connector instance configuration.

Note: This file was created on your behalf and included in the GitHub repo that was cloned earlier during the environment setup steps.

  1. Run command:
code ~/learn-kafka-courses/kafka-connect-101
  1. In VSCode, navigate to the delta-configs directory and open file env.delta.

Note: This file was created as part of the environment set up steps.

  1. On line 8, copy the value assigned to CLOUD_KEY.

  2. In VSCode, locate and open file__ mysql-sink-config.json__.

  3. On line 8, replace <key> with the copied value assigned to CLOUD_KEY.

  4. Return to file env.delta and on line 9, copy the value assigned to CLOUD_SECRET.

  5. Return to file datagen-source-config.json and on line 9, replace <secret> with the copied value assigned to CLOUD_SECRET.

  6. On line 10, replace <mysql-host-endpoint> with the public endpoint of the host where the MySql database is running.

Notice that ssl.mode is set to prefer. This tells Confluent Cloud to connect using TLS if the destination hose is set up to do so. Otherwise a PLAINTEXT connection will be established. For this demonstration, the local host is an AWS EC2 instance that does not have TLS set up so the connection will be nonsecure and the sample data will be unencrypted across the wire. In a production environment, we would want to be sure to set up the destination host to support TLS.

Notice also the connection host. This should be set to the address of the host on which the mysql database Docker container was established during the exercise setup steps.. In the case of the demonstration since an EC2 instance was being used, the sample command specifies the public endpoint address assigned to the AWS EC2 instance. This value can be obtained in the AWS console display of the EC2 instance details.

We can now save and close the configuration file.

  1. Save file datagen-source-config.json and close VSCode.

Let’s now create the MySQL Sink connector instance.

  1. Run command:
 confluent connect create —config mysql-sink-config.json

To verify the connector instance’s status, let’s first list all connector instances in the cluster.

  1. Run command:
confluent connect list

The MySQL Sink connector instance appears in the list with a status of Provisioning. This is expected as it takes a moment for the connector instance to be fully provisioned and running. We need to repeat this command periodically until we see the status has changed to Running before we continue.

Using the connector instance ID that was included in the list command output, let’s use the describe option to obtain additional details about the connector instance.

  1. Run command:
confluent connect describe <connector ID>

Next, let’s run a query on the MySQL database to verify the connector has written records to the transactions table.

  1. Run command:
docker exec -t mysql bash -c 'echo "SELECT * FROM transactions LIMIT 10 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'

Success!

Let’s continue now with our tour of using the Confluent CLI with Confluent Cloud managed connectors.

Perhaps we want to pause a connector instance temporarily. Here is the command to do this.

  1. Run command:
confluent connect pause <connector ID>

Let’s verify both the connector and task are paused using the status command.

  1. Run command:
confluent connect describe <connector ID>

Confirmed.. Let’s now resume the connector and its task.

  1. Run command:
confluent connect resume <connector ID>

And let’s verify both the connector and task are once again running.

  1. Run command:
confluent connect describe <connector ID>

The connector and its task are once again in a Running state.

If you run this demonstration yourself, you need to tear down the environment after doing so to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.

Let’s walk through that tear down process now for this environment.

First we delete MySqlSinkConnector_2.

  1. Run command:
confluent connect delete <connector ID>

Next we will delete the transactions topic.

  1. Run command:
confluent kafka topic delete transactions

And finally, we will shut down the mysql Docker container and free its resources.

  1. Run command:
docker-compose down -v

This concludes this demonstration.

Additional information about the confluent CLI can be found here.

Use the promo code 101CONNECT to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.