course: Kafka Connect 101

Hands On: Confluent Cloud Managed Connector API

9 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

Confluent Cloud APIs are a core building block of Confluent Cloud. You can use them to manage your own account or to integrate Confluent into your product. The goal of this exercise is to demonstrate various Confluent Connect API REST calls that can be used to create, configure, and monitor Confluent managed connectors running in Confluent Cloud. We will also use the org API to identify the cluster ID for the kc-101 cluster as well as the cluster API to create a Kafka topic that we will be streaming data in and out of.

Exercise Environment Preparation Steps

In order to do this exercise using the Confluent Cloud Connector API, you first need to do a bit of preparation to make sure you have all of the tools you need. Complete the following steps to set up your environment. Prior to doing so, you will need to sign up for Confluent Cloud at https://confluent.cloud.

We will use various CLIs during the course exercises including confluent so these need to be available on the machine you plan to run the exercise on. Downloading Confluent Platform will accomplish this.

  1. Run command:

    cd ~ && \
    curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz && \
    tar xzf confluent-7.1.1.tar.gz && \
    echo "export PATH=$HOME/confluent-7.1.1/bin/:$PATH" >> ~/.bashrc && \
    export PATH=$HOME/confluent-7.1.1/bin/:$PATH

We’ll download a useful bash library of functions for interacting with Confluent Cloud. Note that this library is community supported and not supported by Confluent.

  1. Run command:

    curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
  2. Make ccloud_library.sh executable:

    chmod +x ccloud_library.sh

Once that’s installed, we’ll clone the GitHub repository that contains the files needed to run the exercises for the Kafka Connect 101 course.

  1. Run command:

    git clone \
    https://github.com/confluentinc/learn-kafka-courses.git \
    ~/learn-kafka-courses

Let’s now log in to the confluent CLI.

  1. In a terminal window, run command:

    confluent login --save

When using the confluent CLI, you usually need to specify your environment and cluster for every command. To make our lives easier, we can set default parameters so that we won’t have to specify them for each individual command. Let’s start with the Confluent Cloud environment ID. Let’s list the available environments for our org.

  1. Run command:

    confluent environment list

We will use the default environment so let’s set it as the default.

  1. Run command:

    confluent environment use <default env ID> 

Next, let’s create a Confluent Cloud cluster to use with the exercise.

  1. Run command:

    confluent kafka cluster create kc-101 \
    --cloud gcp \
    --region us-west4 \
    --type basic

Note: The kc-101 cluster may already exist. If so, run the following command to identify its resource ID which is needed for the step that follows:

confluent kafka cluster list

Now let’s set it as the default cluster for the confluent CLI.

  1. Run command:
    confluent kafka cluster use <cluster ID>

The confluent CLI needs to authenticate with Confluent Cloud using an API key and secret that has the required privileges for the cluster. Let’s create these now. The confluent CLI will automatically save them in ~/.confluent/config.json making them available for use by the CLI.

  1. Run command:
    confluent api-key create  --resource <cluster ID>

Let’s now set this API key and secret as the default if it isn’t otherwise specified in confluent CLI commands.

  1. Run command:

    confluent api-key use <cluster API key> --resource <cluster ID>

We also need to enable Schema Registry for our Confluent Cloud environment. Let’s do this now.

Note: If Schema Registry has already been enabled for the environment, the command will return the existing cluster details.

  1. Run command:

    confluent schema-registry cluster enable --cloud gcp --geo us

The confluent CLI needs to also authenticate with the Schema Registry using an API key and secret. Let’s create these now. The confluent CLI will also automatically save them in ~/.confluent/config.json making them available for use by the CLI.

  1. Run command:
    confluent api-key create  --resource <SR cluster ID>

In addition to the confluent CLI, we will also be using Kafka clients that will need to authenticate with the cluster and Schema Registry using the API keys and secrets. We can create a client configuration file using the confluent CLI. The command will automatically obtain the cluster API key and secret from ~/.confluent/config.json but we need to specify the SR API key and secret using the --sr-apikey and --sr-apisecret parameters.

  1. Run command:
    confluent kafka client-config create java --sr-apikey <sr API key> --sr-apisecret <sr API secret> | tee $HOME/.confluent/java.config

Required parameters for cluster API REST calls include the cluster API key and secret. The java.config file that was created in the previous step contains the cluster API key and secret. The ccloud_library.sh script auto-generates configuration files for downstream clients using the java.config file as input. One of the output files is delta_configs/env.delta which contains commands that establish environment variables equal to the cluster.

Let’s generate these files now.

  1. Run command:

    (cd ~ && \
    rm -rf delta_configs && \
    source ~/ccloud_library.sh && \
    ccloud::generate_configs $HOME/.confluent/java.config)

And now we will establish the environment variables for our current command shell.

  1. Run command:
    source ~/delta_configs/env.delta

And finally, let’s verify the previous step was successful.

  1. Run command:

    printenv

You should see in the command output the environment variables contained in env.delta have been established.

During this exercise we will be streaming data to a MySql database running on the local machine in a Docker container. The associated docker-compose.yml file is located in the learn-kafka-courses/kafka-connect-101 directory. We will now start this Docker container.

  1. Run commands:

    cd ~/learn-kafka-courses/kafka-connect-101 && \
    cp mysql-only.yml docker-compose.yml && \
    docker-compose up -d

Alright, with that, our environment preparation is complete. We know that was quite a bit of work, but now we’re ready to get started with this and all of the following exercises. Let’s dive in!

Confluent Cloud Managed Connector API exercise steps

We will start by using the Confluent Cloud org API. This requires a Confluent Cloud API Key. We can generate one using the confluent CLI. We’ll first log into Confluent Cloud – and note that we’re using the --save flag so that our credentials will be used for subsequent commands in this exercise. They’ll remain active until you run the logout command.

  1. Run command:

    confluent login --save

Enter email and password

As usual, we need an API Key to connect to our cloud cluster. Like we saw in the setup instructions, we can generate a new API Key here using:

  1. Run command:

    confluent api-key create --resource cloud

Next, we will convert the API key and secret to a base64 encoded string. This string is used in the authorization header that will be included in the REST calls to the Confluent Cloud API. At the same time, we will assign this string to an environment variable which will make it easier to submit the REST calls during the rest of this exercise.

  1. Run command:

    CLOUD_AUTH64=$(echo -n <API Key>:<API Secret> | base64 -w0)

We need to also convert the kc-101 cluster API key and secret to a base64 encoded string to use in the authorization header for REST calls to the Cluster API. It will be slightly easier for it since we can reference the environment variables created earlier from the java.config file.

  1. Run command:

    CLUSTER_AUTH64=$(echo -n $CLOUD_KEY:$CLOUD_SECRET | base64 -w0)

Now we are ready to issue REST calls to both Confluent Cloud control plane APIs as well as cluster APIS.

Let’s first list our environments by issuing a REST call to the org API.

  1. Run command:

    curl --request GET \
    --url 'https://api.confluent.cloud/org/v2/environments' \
    --header 'Authorization: Basic '$CLOUD_AUTH64''

Locate the default environment in the response and note its id. Using this id, we can now obtain the id for the kc-101 cluster.

Note: We will be using this environment id in later curl commands so keep it handy.

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/cmk/v2/clusters?environment=<env ID>' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' | jq '.'

Locate the kc-101 cluster in the response and note its id.

Using this id, let’s now create a new Kafka topic named transactions. We will set this as the target for the Datagen source connector. Notice we are using the cluster API for this REST call thus the cluster authorization string instead of the cloud authorization string used on the previous commands.

Note: We will also be using this cluster id in later curl commands so keep it handy as well.

  1. Run command:

    curl --request POST \
    --url 'https://pkc-6ojv2.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/<cluster ID>/topics' \
    --header 'Authorization: Basic '$CLUSTER_AUTH64'' \
    --header 'content-type: application/json' \
    --data '{
      "topic_name": "transactions",
      "partitions_count": 6,
      "replication_factor": 3

Before proceeding, we should take a moment to verify that the transactions topic was created successfully.

  1. Run command:
curl --request GET \
  --url 'https://pkc-6ojv2.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/<cluster ID>/topics' \
  --header 'Authorization: Basic '$CLUSTER_AUTH64'' | jq '.'

Stream sample data to a Kafka topic using the DatagenSource connector

Before we create the DatagenSource connector instance, it’s always a good idea to list the fully-managed connector plugins that are available to us for streaming using our Confluent Cloud environment. Note that this list may be a little different depending upon what cloud provider Confluent Cloud is running in (and that’s why it’s good to check!).

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connector-plugins' \
  --header 'Authorization: Basic '$CLOUD_AUTH64''

It is a bit difficult to spot the different plugins in the command output. For our sanity, let’s run the command again and pipe the output to the jq command to get it in a friendlier format.

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connector-plugins' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' | jq '.'

Alright, now we can actually tell which individual plugins are available. As you can see, the list is quite long and includes the DatagenSource connector.

Let’s create a DatagenSource connector instance using the API. To do so, we use a PUT or POST REST call. PUT is somewhat easier because it will create the connector if it doesn’t exist, or update it if it already exists. If it already exists and there’s no update to make, it won’t error—so PUT is the idempotent way of updating a connector.

  1. Run command:
curl --request PUT \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/DatagenSourceConnector_2/config' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' \
  --header 'content-type: application/json' \
  --data '{
    "connector.class": "DatagenSource",
    "name": "DatagenSourceConnector_2",
    "kafka.api.key": "'$CLOUD_KEY'",
    "kafka.api.secret": "'$CLOUD_SECRET'",
    "kafka.topic": "transactions",
    "output.data.format": "AVRO",
    "quickstart": "TRANSACTIONS",
    "tasks.max": "1"
}'

To verify the connector instance’s status, let’s first list all connector instances in the cluster.

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors' \
  --header 'Authorization: Basic '$CLOUD_AUTH64''

Next, let’s check the status of the__ DatagenSourceConnector_2__ connector instance we just created.

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/DatagenSourceConnector_2/status' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' | jq '.'

The current status is provisioning. This is expected as it takes a moment for the connector instance to be fully provisioned and up and running. Before we continue, it’s a good idea to repeat this command periodically until we see the status has changed to Running.

Once the status shows the connector instance is Running, we can consume records from the destination topic to verify that sample data being produced as expected.

  1. Run command:

    kafka-avro-console-consumer \
    --bootstrap-server ${BOOTSTRAP_SERVERS} \
    --property schema.registry.url=${SCHEMA_REGISTRY_URL} \
    --property basic.auth.credentials.source=${BASIC_AUTH_CREDENTIALS_SOURCE} \
    --property basic.auth.user.info=${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO} \
    --consumer.config ~/.confluent/java.config \
    --topic transactions \
    --max-messages 10 \
    --from-beginning

We should now have a decent amount of sample data in the transactions topic. So that we don’t unnecessarily exhaust any Confluent Cloud promotional credits, we can go ahead and delete the DatagenSource connector instance.

  1. Run command:
curl --request DELETE \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/DatagenSourceConnector_2' \
  --header 'Authorization: Basic '$CLOUD_AUTH64''

By this point, we’ve set up the start of our data pipeline – we made a connector to generate data to a Kafka topic. Next, we can establish the downstream side of the pipeline by setting up a MySql Sink Connector. This connector will consume records from the transactions topic and write them out to a corresponding table in our MySql database that is running in the local docker container that we started during the exercise environment setup steps.

Let’s take a look at the request we’ll be using to set up this connector.

  1. In our terminal window, enter command but do not run the command quite yet:

    curl --request PUT \
        --url 'https://api.confluent.cloud/connect/v1/environments/env-6vkq3/clusters/<cluster ID>/connectors/MySqlSinkConnector_2/config' \
        --header 'Authorization: Basic '$CLOUD_AUTH64'' \
        --header 'content-type: application/json' \
        --data '{
            "connector.class": "MySqlSink",
            "name": "MySqlSinkConnector_2",
            "topics": "transactions",
            "input.data.format": "AVRO",
            "input.key.format": "STRING",
            "kafka.api.key": "'$CLOUD_KEY'",
            "kafka.api.secret": "'$CLOUD_SECRET'",
            "connection.host": "ec2-54-175-153-98.compute-1.amazonaws.com",
            "connection.port": "3306",
            "connection.user": "kc101user",
            "connection.password": "kc101pw",
            "db.name": "demo",
            "ssl.mode": "prefer",
            "pk.mode": "none",
            "auto.create": "true",
            "auto.evolve": "true",
            "tasks.max": "1"
    }'

Notice that ssl.mode is set to prefer. This configuration parameter tells Confluent Cloud to connect using TLS if the destination host is set up to do so. Otherwise, a PLAINTEXT connection will be established. For this demonstration, the local host is an AWS EC2 instance that does not have TLS set up so the connection will be nonsecure and the sample data will be unencrypted across the wire. In a production environment, we would want to be sure to set up the destination host to support TLS.

Notice also the connection host. This should be set to the address of the host on which the mysql database Docker container was established during the exercise setup steps. In the case of the demonstration since an EC2 instance was being used, the sample command specifies the public endpoint address assigned to the AWS EC2 instance. This value can be obtained in the AWS console display of the EC2 instance details.

Now we can continue by running the curl command.

  1. Run the curl command

After that command is executed, we should check the status of the DatagenSourceConnector_2 connector instance we just created.

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/MySqlSinkConnector_2/status' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' | jq '.'

We see the status of provisioning and need to wait for the status to be Running before continuing. We’ll run it a few more times to check…

Once the status shows the connector instance is Running, we can run a query on the MySql database to verify the connector has written records to the transactions table.

  1. Run command:

    docker exec -t mysql bash -c 'echo "SELECT * FROM transactions LIMIT 10 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'

Success!

Let’s continue with our tour of the Confluent Cloud API.

Another useful command allows us to inspect the config for a given connector as follows.

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/MySqlSinkConnector_2/config' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' | jq '.'

There are also use cases where you might need to pause a connector instance temporarily either for debugging or maintenance purposes. Here is the command to do this.

  1. Run command:
curl --request PUT \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/MySqlSinkConnector_2/pause' \
  --header 'Authorization: Basic '$CLOUD_AUTH64''

As always, it’s good to verify both the connector and task are paused after running this. We can use the status command like before.

  1. Run command:
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/MySqlSinkConnector_2/status' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' | jq '.'

Confirmed! Let’s now resume the connector and its task.

  1. Run command:
curl --request PUT \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/MySqlSinkConnector_2/resume' \
  --header 'Authorization: Basic '$CLOUD_AUTH64''
``

And finally, let’s verify both the connector and task are once again running.

24. Run command:

```sh
curl --request GET \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/MySqlSinkConnector_2/status' \
  --header 'Authorization: Basic '$CLOUD_AUTH64'' | jq '.'

Everything is up and in the Running state. So that wraps up our tour of the Confluent Connect API.

Note that if you run this demonstration yourself, you’ll need to tear down the environment after doing so to avoid unnecessarily accruing cost to the point that your promotional credits are exhausted.

Let’s walk through that tear down process now for this environment.

First things first, we should delete the sink connector.

  1. Run command:
curl --request DELETE \
  --url 'https://api.confluent.cloud/connect/v1/environments/<env ID>/clusters/<cluster ID>/connectors/MySqlSinkConnector_2' \
  --header 'Authorization: Basic '$CLOUD_AUTH64''

Since there’s no running source connector, we’re safe to go ahead and delete the transactions topic.

  1. Run command:
curl --request DELETE \
  --url 'https://pkc-6ojv2.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/<cluster ID>/topics/transactions' \
  --header 'Authorization: Basic '$CLUSTER_AUTH64''

And finally, we will shut down the MySQL Docker container and free its resources.

  1. Run command:

    docker-compose down -v

And with that, you should have a decent idea of the REST API and the various commands available to you through it.

Additional information about Confluent Cloud APIs can be found here.SR API key and secret using the

Use the promo code 101CONNECT to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.