course: Kafka Connect 101

Hands On: Run a Self-Managed Connector in Docker

8 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

Exercise Environment Preparation Steps

Complete the following steps to set up the environment used for the course exercises. Prior to doing so, you will need to sign up for Confluent Cloud at https://confluent.cloud.

Note: Steps 1-15 are the same as those included in 6. Hands On: Confluent Cloud Managed Connector API and 7. Hands On: Confluent Cloud Managed Connector CLI. If you already completed them for either exercise, you can skip to step 16 of this exercise setup.

We will use various CLI during the course exercises including confluent so these need to be available on the machine you plan to run the exercise on. Downloading Confluent Platform will accomplish this.

  1. Run command:
cd ~ && \
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz && \
tar xzf confluent-7.1.1.tar.gz && \
echo "export PATH=$HOME/confluent-7.1.1/bin/:$PATH" >> ~/.bashrc && \
export PATH=$HOME/confluent-7.1.1/bin/:$PATH

Next, we will download a bash library of useful functions for interacting with Confluent Cloud.

Note: This library is community -supported and not supported by Confluent.

  1. Run command:
curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
  1. Make ccloud_library.sh executable:
chmod +x ccloud_library.sh

Next, we will clone the GitHub repository that contains the files needed to run the exercises for the Kafka Connect 101 course.

  1. Run command:
git clone \
https://github.com/confluentinc/learn-kafka-courses.git \
~/learn-kafka-courses

Let’s now log in to the confluent CLI.

  1. In a terminal window, run command:
confluent login --save

We can set several defaults for the confluent CLI so that we won’t have to specify them for each individual command. Let’s start with the Confluent Cloud environment ID. Let’s list the available environments for our org.

  1. Run command:
confluent environment list

We will use the default environment so let’s set it as the default.

  1. Run command:
confluent environment use <default env ID>

Next, let’s create a Confluent Cloud cluster to use with the exercise.

  1. Run command:
confluent kafka cluster create kc-101 \
    --cloud gcp \
    --region us-west4 \
    --type basic

Note: The kc-101 cluster may already exist. If so, run the following command to identify its resource ID which is needed for the step that follows: confluent kafka cluster list

Now let’s set it as the default cluster for the confluent CLI.

  1. Run command:
confluent kafka cluster use <cluster ID>

The confluent CLI needs to authenticate with Confluent Cloud using an API key and secret that has the required privileges for the cluster. Let’s create these now. The confluent CLI will automatically save them in ~/.confluent/config.json making them available for use by the CLI.

  1. Run command:
confluent api-key create  --resource <cluster ID>

Let’s now set this API key and secret as the default if it isn’t otherwise specified in confluent CLI commands.

  1. Run command:
confluent api-key use <cluster API key> --resource <cluster ID>

We also need to enable Schema Registry for our Confluent Cloud environment. Let’s do this now.

Note: If Schema Registry has already been enabled for the environment, the command will return the existing cluster details.

  1. Run command:
confluent schema-registry cluster enable --cloud gcp --geo us

The confluent CLI needs to also authenticate with the Schema Registry using an API key and secret. Let’s create these now. The confluent CLI will also automatically save them in ~/.confluent/config.json making them available for use by the CLI.

  1. Run command:
confluent api-key create  --resource <SR cluster ID>

In addition to the confluent CLI, we will also be using Kafka clients that will need to authenticate with the cluster and Schema Registry using the API keys and secrets. We can create a client configuration file using the confluent CLI. The command will automatically obtain the cluster API key and secret from ~/.confluent/config.json but we need to specify the SR API key and secret using the --sr-apikey and --sr-apisecret parameters.

  1. Run command:
confluent kafka client-config create java --sr-apikey --sr-apisecret <sr API secret> | tee $HOME/.confluent/java.config

Note: If tee isn’t present on your machine, you will need to create .confluent/java.config using some other method from the client config statements that are included in the command output.

Required parameters for cluster API REST calls include the cluster API key and secret. The java.config file that was created in the previous step contains the cluster API key and secret. The ccloud_library.sh script auto-generates configuration files for downstream clients using as input the java.config file. One of the output files is delta_configs/env.delta which contains commands that establish environment variables equal to the cluster.

Let’s generate these files now.

  1. Run command:
source ccloud_library.sh
ccloud::generate_configs $HOME/.confluent/java.config

And now we will establish the environment variables for our current command shell.

  1. Run command:
source delta_configs/env.delta

And finally, let’s verify the previous step was successful.

  1. Run command:
printenv

You should see in the command output the environment variables contained in env.delta have been established.

During this exercise we will be streaming data from a MySQL database running on the local machine in a Docker container. We’ll also be running local instances of Elasticsearch and Neo4j for use with sink connectors. The associated docker-compose.yml file is located in the learn-kafka-courses/kafka-connect-101 directory. We will now start this Docker container.

  1. Run commands:
cd ~/learn-kafka-courses/kafka-connect-101 && \
cp self-managed-connect.yml docker-compose.yml && \
docker-compose up -d

This concludes the exercise environment preparation steps.

You can now proceed with this exercise.

Running a Self-Managed Connector with Confluent Cloud exercise steps

As mentioned previously in this course, the rate at which managed connectors are being added to Confluent Cloud is impressive, but you may find that the connector you want to use with Confluent Cloud isn’t yet available. This is one scenario where you will need to run your own Kafka Connect worker which then connects to Confluent Cloud. In this exercise, we will do exactly that.

  • Establish a self-managed Kafka Connect cluster
    • This cluster will run in Docker containers and be configured to write to and read from Kafka topics in our kc-101 Confluent Cloud cluster
  • Create Kafka Connect data pipelines
    • Pipeline one will use a self-managed Debezium MySQL source connector to stream data from our local MySQL database to a Kafka topic in Confluent Cloud
    • Pipeline two will use a self-managed Elasticsearch sink connector to stream data from a Kafka topic in our kc-101 Confluent Cloud cluster to a local instance of Elasticsearch
    • Pipeline three will use a self-managed Neo4j sink connector to stream data from a Kafka topic in our kc-101 Confluent Cloud cluster to a local instance of Neo4j

Our self-managed Kafka Connect cluster will require the Confluent Cloud Java configuration settings we previously obtained. We will make them available via environment variables. We will do this using the delta_configs/env.delta that was created in the exercise setup steps.

Before we do this though, let’s review the contents of java.config.

  1. Run command:
cat ~/.confluent/java.config

Observe the settings, including the Confluent Cloud cluster endpoint that is used for the bootstrap servers property. It also includes values for the cluster API key and secret which clients need to authenticate with Confluent Cloud. The settings also include the Schema Registry endpoint and API key and secret. These are also needed by clients.

Now, let’s review the contents of delta_configs/env.delta.

  1. Run commands:
cd ~/learn-kafka-courses/kafka-connect-101
cat delta_configs/env.delta

As you can see, this file contains numerous export commands that create environment variables set to the value of a corresponding Java client configuration setting.

We will establish these variables for our current command shell.

  1. Run command:
source delta_configs/env.delta

And now we will verify the previous step was successful.

  1. Run command:
printenv

You should see in the command output the environment variables contained in env.delta are active.

Now let’s examine the Docker container definitions being used by the exercise environment.

  1. Run command:
code docker-compose.yml

The first Docker container named connect-1 is one of the two Kafka Connect worker nodes we will be working with. Let’s now examine several configuration settings that are specified in the container’s environment: settings.

environment-settings-2

  • In line 15, CONNECT_BOOTSTRAP_SERVERS is set equal to the $BOOTSTRAP_SERVERS which is one of the environment variables we made available in step 3. This is the endpoint address of the kc-101 Confluent Cloud cluster.

  • In line 16, we see that the group.id property is set to kc-101. This is what tells the connect-1 worker node which Connect cluster it should join upon startup. All other worker nodes with the same group.id value belong to the same Connect cluster.

  • In lines 17-19, we see three internal topics the worker nodes use to keep in sync with one another. The names for these topics need to be unique for each Connect cluster.

schema-registry (1)

  • In lines 29-31, three more environment variables are used to set values used by the Connect workers to connect to Schema Registry which in our case is located in Confluent Cloud.

connect-worker

  • In lines 46, 51, and 56 we see another environment variable used to set the value that the worker as well as the underlying producer and consumer will use to authenticate with the Confluent Cloud cluster.

command-producer-consumer

  • In lines 64-67 we see that the connector plugins that we will be using during our exercises are being downloaded from Confluent Hub and installed on the connect-1 worker node. The connect-2 worker node container configuration also includes these same plugin installation steps. Connector plugins must be installed on every node in the connect cluster.

Scrolling down, we will find that the environment variables are also used to define the connect-2 worker node as well as the local control-center node.

Scrolling down further, you will see additional Docker containers defined including:

  • MySQL
  • Elasticsearch
  • Neo4j

Let’s now start the Kafka Connect cluster as well as the other Docker containers.

  1. Run command:
docker-compose up -d

And verify they are running.

  1. Run command:
docker-compose ps

And finally verify the Kafka Connect workers are ready.

  1. Run command:
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 15
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'

Make sure that the Elasticsearch, Debezium, and Neo4j connector plugins are available.

  1. Run command:
curl -s localhost:8083/connector-plugins | jq '.[].class'|egrep 'Neo4jSinkConnector|MySqlConnector|ElasticsearchSinkConnector'

Expected command output:

"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
"io.debezium.connector.mysql.MySqlConnector"
"streams.kafka.connect.sink.Neo4jSinkConnector"

Note: If jq isn’t present on your machine, the output from this command and others in this exercise that use jq will not appear as expected.

Let’s now verify that our self-managed Kafka Connect cluster is using our Confluent Cloud cluster.

  1. In the Confluent Cloud console, navigate to the kc-101 cluster, expand Data Integration, select Clients, and then select Consumers.

Observe there is a consumer for each of the internal topics.

Create Kafka Connect Pipelines

Stream data from MySql to a Kafka Topic

Next, we will stream data from our local MySQL database to a Confluent Cloud Kafka topic using the Debezium MySQL Source connector. We will start by viewing a sample of records in the database.

  1. In a new terminal window or tab, run command:
docker exec -t mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 10 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'

These are the records that we will stream to a Kafka topic in Confluent Cloud.

Before we continue, let’s start a process to generate additional rows to the MySQL database.

  1. In the same terminal window or tab, run command:
docker exec mysql /data/02_populate_more_orders.sh

Let’s now create the MySQL source connector instance.

  1. Return to the original terminal window and run command:
curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-debezium-orders-01/config \
    -d '{
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
            "value.converter.schemas.enable": "true",
            "value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
            "value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
            "value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
            "database.hostname": "mysql",
            "database.port": "3306",
            "database.user": "kc101user",
            "database.password": "kc101pw",
            "database.server.id": "42",
            "database.server.name": "asgard",
            "table.whitelist": "demo.orders",
            "database.history.kafka.bootstrap.servers": "'$BOOTSTRAP_SERVERS'",
            "database.history.consumer.security.protocol": "SASL_SSL",
            "database.history.consumer.sasl.mechanism": "PLAIN",
            "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
            "database.history.producer.security.protocol": "SASL_SSL",
            "database.history.producer.sasl.mechanism": "PLAIN",
            "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
            "database.history.kafka.topic": "dbhistory.demo",
            "topic.creation.default.replication.factor": "3",
            "topic.creation.default.partitions": "3",
            "decimal.handling.mode": "double",
            "include.schema.changes": "true",
            "transforms": "unwrap,addTopicPrefix",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.addTopicPrefix.regex":"(.*)",
            "transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
    }'

Let’s check the status of the connector instance.

  1. Run command:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort

Expected output:

source  |  source-debezium-orders-01  |  RUNNING  |  RUNNING  |  io.debezium.connector.mysql.MySqlConnector

Let’s view the records in the Confluent Cloud Kafka topic.

  1. In the Confluent Cloud console, navigate to the kc-101 cluster, select Topics, select the mysql-debezium-asgard.demo.ORDERS topic, and select the Messages tab.

Observe the records being written to the topic by the MySQL source connector.

Before continuing, let’s stop the MySQL data generator we have running.

  1. In the second terminal window/tab where the data generator is running, press Ctrl+C to stop it.

  2. Close this terminal window/tab.

Stream Data from Kafka to Elasticsearch

Our next step in this exercise is to stream the data that was sourced from the MySQL database to an Elasticsearch sink.

To start, we will create an Elasticsearch sink connector instance. Notice that for this connector, we set tasks.max equal to 2. This is primarily for instructional purposes since the load doesn’t really need to be spread out across our two connect worker nodes. We will examine how these multiple tasks are distributed in later exercises.

  1. In our terminal window, run command:
curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-elastic-orders-01/config \
    -d '{
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
            "value.converter.schemas.enable": "true",
            "value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
            "value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
            "value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
            "topics": "mysql-debezium-asgard.demo.ORDERS",
            "connection.url": "http://elasticsearch:9200",
            "key.ignore": "true",
            "schema.ignore": "true",
            "tasks.max": "2"
    }'

Let’s check the status of the connector instance.

  1. Run command:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort | grep ElasticsearchSinkConnector

Expected output:

source  |  source-debezium-orders-01  |  RUNNING  |  RUNNING  | RUNNING  |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

Now let’s inspect the data that is written to Elasticsearch.

  1. Run command:
curl -s http://localhost:9200/mysql-debezium-asgard.demo.orders/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 5, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\
    jq '.hits.hits[]._source | .id, .CREATE_TS'

Stream Data from Kafka to Neo4j

Our next step in this exercise is to stream the data that was sourced from the MySQL database to a Neo4j sink. Notice that for this connector, we set tasks.max equal to 2.

  1. In our terminal window, run command:
curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-neo4j-orders-01/config \
    -d '{
            "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
           "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
            "value.converter.schemas.enable": "true",
            "value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
            "value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
            "value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
            "topics": "mysql-debezium-asgard.demo.ORDERS",
            "tasks.max": "2",
            "neo4j.server.uri": "bolt://neo4j:7687",
            "neo4j.authentication.basic.username": "neo4j",
            "neo4j.authentication.basic.password": "connect",
            "neo4j.topic.cypher.mysql-debezium-asgard.demo.ORDERS": "MERGE (city:city{city: event.delivery_city}) MERGE (customer:customer{id: event.customer_id, delivery_address: event.delivery_address, delivery_city: event.delivery_city, delivery_company: event.delivery_company}) MERGE (vehicle:vehicle{make: event.make, model:event.model}) MERGE (city)<-[:LIVES_IN]-(customer)-[:BOUGHT{order_total_usd:event.order_total_usd,order_id:event.id}]->(vehicle)"
        } '

Let’s check the status of the connector instance.

  1. Run command:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort | grep Neo4jSinkConnector

Expected output:

source  |  source-debezium-orders-01  |  RUNNING  |  RUNNING  | RUNNING  |  io.confluent.connect.elasticsearch.Neo4jSinkConnector

In this exercise we:

  • Implemented a self-managed Kafka Connect cluster and associated it with a Confluent Cloud cluster
  • Created a source connector instance that consumed records from a local MySQLdatabase and wrote corresponding records to a Kafka topic in the Confluent Cloud cluster
  • Created two sink connector instances that consumed records from a Kafka topic in the Confluent Cloud cluster and wrote corresponding records out to a local Elasticsearch instance and a local Neo4j instance

If you run this demonstration yourself, you need to tear down the environment after doing so to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.

Let’s walk through that tear down process now for this environment.

First, we will shut down the Docker containers and free the resources they are using.

  1. Run command:
docker-compose down -v

Next we will delete the Kafka topics that were created during the exercise. To start, let’s set the cluster context for the confluent CLI.

  1. Run commands:
confluent kafka cluster list
confluent kafka cluster use <kc-101 cluster ID>

We can now use the list command to identify the topic names we need to delete.

  1. Run command:
confluent kafka topic list

And now we can delete each of these topics.

  1. Run command:
confluent kafka topic delete <topic>

Repeat command for each topic listed in the previous step.

This concludes this exercise.

Use the promo code 101CONNECT to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.