Senior Developer Advocate (Presenter)
Complete the following steps to set up the environment used for the course exercises. Prior to doing so, you will need to sign up for Confluent Cloud at https://confluent.cloud.
Note: Steps 1-15 are the same as those included in 6. Hands On: Confluent Cloud Managed Connector API and 7. Hands On: Confluent Cloud Managed Connector CLI. If you already completed them for either exercise, you can skip to step 16 of this exercise setup.
We will use various CLI during the course exercises including confluent so these need to be available on the machine you plan to run the exercise on. Downloading Confluent Platform will accomplish this.
cd ~ && \
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz && \
tar xzf confluent-7.1.1.tar.gz && \
echo "export PATH=$HOME/confluent-7.1.1/bin/:$PATH" >> ~/.bashrc && \
export PATH=$HOME/confluent-7.1.1/bin/:$PATH
Next, we will download a bash library of useful functions for interacting with Confluent Cloud.
Note: This library is community -supported and not supported by Confluent.
curl -sS -o ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
chmod +x ccloud_library.sh
Next, we will clone the GitHub repository that contains the files needed to run the exercises for the Kafka Connect 101 course.
git clone \
https://github.com/confluentinc/learn-kafka-courses.git \
~/learn-kafka-courses
Let’s now log in to the confluent CLI.
confluent login --save
We can set several defaults for the confluent CLI so that we won’t have to specify them for each individual command. Let’s start with the Confluent Cloud environment ID. Let’s list the available environments for our org.
confluent environment list
We will use the default environment so let’s set it as the default.
confluent environment use <default env ID>
Next, let’s create a Confluent Cloud cluster to use with the exercise.
confluent kafka cluster create kc-101 \
--cloud gcp \
--region us-west4 \
--type basic
Note: The kc-101 cluster may already exist. If so, run the following command to identify its resource ID which is needed for the step that follows: confluent kafka cluster list
Now let’s set it as the default cluster for the confluent CLI.
confluent kafka cluster use <cluster ID>
The confluent CLI needs to authenticate with Confluent Cloud using an API key and secret that has the required privileges for the cluster. Let’s create these now. The confluent CLI will automatically save them in ~/.confluent/config.json making them available for use by the CLI.
confluent api-key create --resource <cluster ID>
Let’s now set this API key and secret as the default if it isn’t otherwise specified in confluent CLI commands.
confluent api-key use <cluster API key> --resource <cluster ID>
We also need to enable Schema Registry for our Confluent Cloud environment. Let’s do this now.
Note: If Schema Registry has already been enabled for the environment, the command will return the existing cluster details.
confluent schema-registry cluster enable --cloud gcp --geo us
The confluent CLI needs to also authenticate with the Schema Registry using an API key and secret. Let’s create these now. The confluent CLI will also automatically save them in ~/.confluent/config.json making them available for use by the CLI.
confluent api-key create --resource <SR cluster ID>
In addition to the confluent CLI, we will also be using Kafka clients that will need to authenticate with the cluster and Schema Registry using the API keys and secrets. We can create a client configuration file using the confluent CLI. The command will automatically obtain the cluster API key and secret from ~/.confluent/config.json but we need to specify the SR API key and secret using the --sr-apikey and --sr-apisecret parameters.
confluent kafka client-config create java --sr-apikey --sr-apisecret <sr API secret> | tee $HOME/.confluent/java.config
Note: If tee isn’t present on your machine, you will need to create .confluent/java.config using some other method from the client config statements that are included in the command output.
Required parameters for cluster API REST calls include the cluster API key and secret. The java.config file that was created in the previous step contains the cluster API key and secret. The ccloud_library.sh script auto-generates configuration files for downstream clients using as input the java.config file. One of the output files is delta_configs/env.delta which contains commands that establish environment variables equal to the cluster.
Let’s generate these files now.
source ccloud_library.sh
ccloud::generate_configs $HOME/.confluent/java.config
And now we will establish the environment variables for our current command shell.
source delta_configs/env.delta
And finally, let’s verify the previous step was successful.
printenv
You should see in the command output the environment variables contained in env.delta have been established.
During this exercise we will be streaming data from a MySQL database running on the local machine in a Docker container. We’ll also be running local instances of Elasticsearch and Neo4j for use with sink connectors. The associated docker-compose.yml file is located in the learn-kafka-courses/kafka-connect-101 directory. We will now start this Docker container.
cd ~/learn-kafka-courses/kafka-connect-101 && \
cp self-managed-connect.yml docker-compose.yml && \
docker-compose up -d
This concludes the exercise environment preparation steps.
You can now proceed with this exercise.
As mentioned previously in this course, the rate at which managed connectors are being added to Confluent Cloud is impressive, but you may find that the connector you want to use with Confluent Cloud isn’t yet available. This is one scenario where you will need to run your own Kafka Connect worker which then connects to Confluent Cloud. In this exercise, we will do exactly that.
Our self-managed Kafka Connect cluster will require the Confluent Cloud Java configuration settings we previously obtained. We will make them available via environment variables. We will do this using the delta_configs/env.delta that was created in the exercise setup steps.
Before we do this though, let’s review the contents of java.config.
cat ~/.confluent/java.config
Observe the settings, including the Confluent Cloud cluster endpoint that is used for the bootstrap servers property. It also includes values for the cluster API key and secret which clients need to authenticate with Confluent Cloud. The settings also include the Schema Registry endpoint and API key and secret. These are also needed by clients.
Now, let’s review the contents of delta_configs/env.delta.
cd ~/learn-kafka-courses/kafka-connect-101
cat delta_configs/env.delta
As you can see, this file contains numerous export commands that create environment variables set to the value of a corresponding Java client configuration setting.
We will establish these variables for our current command shell.
source delta_configs/env.delta
And now we will verify the previous step was successful.
printenv
You should see in the command output the environment variables contained in env.delta are active.
Now let’s examine the Docker container definitions being used by the exercise environment.
code docker-compose.yml
The first Docker container named connect-1 is one of the two Kafka Connect worker nodes we will be working with. Let’s now examine several configuration settings that are specified in the container’s environment: settings.
In line 15, CONNECT_BOOTSTRAP_SERVERS is set equal to the $BOOTSTRAP_SERVERS which is one of the environment variables we made available in step 3. This is the endpoint address of the kc-101 Confluent Cloud cluster.
In line 16, we see that the group.id property is set to kc-101. This is what tells the connect-1 worker node which Connect cluster it should join upon startup. All other worker nodes with the same group.id value belong to the same Connect cluster.
In lines 17-19, we see three internal topics the worker nodes use to keep in sync with one another. The names for these topics need to be unique for each Connect cluster.
Scrolling down, we will find that the environment variables are also used to define the connect-2 worker node as well as the local control-center node.
Scrolling down further, you will see additional Docker containers defined including:
Let’s now start the Kafka Connect cluster as well as the other Docker containers.
docker-compose up -d
And verify they are running.
docker-compose ps
And finally verify the Kafka Connect workers are ready.
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
sleep 15
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
Make sure that the Elasticsearch, Debezium, and Neo4j connector plugins are available.
curl -s localhost:8083/connector-plugins | jq '.[].class'|egrep 'Neo4jSinkConnector|MySqlConnector|ElasticsearchSinkConnector'
Expected command output:
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
"io.debezium.connector.mysql.MySqlConnector"
"streams.kafka.connect.sink.Neo4jSinkConnector"
Note: If jq isn’t present on your machine, the output from this command and others in this exercise that use jq will not appear as expected.
Let’s now verify that our self-managed Kafka Connect cluster is using our Confluent Cloud cluster.
Observe there is a consumer for each of the internal topics.
Next, we will stream data from our local MySQL database to a Confluent Cloud Kafka topic using the Debezium MySQL Source connector. We will start by viewing a sample of records in the database.
docker exec -t mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 10 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
These are the records that we will stream to a Kafka topic in Confluent Cloud.
Before we continue, let’s start a process to generate additional rows to the MySQL database.
docker exec mysql /data/02_populate_more_orders.sh
Let’s now create the MySQL source connector instance.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-orders-01/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
"value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "kc101user",
"database.password": "kc101pw",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.orders",
"database.history.kafka.bootstrap.servers": "'$BOOTSTRAP_SERVERS'",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.kafka.topic": "dbhistory.demo",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "3",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
}'
Let’s check the status of the connector instance.
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
Expected output:
source | source-debezium-orders-01 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector
Let’s view the records in the Confluent Cloud Kafka topic.
Observe the records being written to the topic by the MySQL source connector.
Before continuing, let’s stop the MySQL data generator we have running.
In the second terminal window/tab where the data generator is running, press Ctrl+C to stop it.
Close this terminal window/tab.
Our next step in this exercise is to stream the data that was sourced from the MySQL database to an Elasticsearch sink.
To start, we will create an Elasticsearch sink connector instance. Notice that for this connector, we set tasks.max equal to 2. This is primarily for instructional purposes since the load doesn’t really need to be spread out across our two connect worker nodes. We will examine how these multiple tasks are distributed in later exercises.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-orders-01/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
"value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
"topics": "mysql-debezium-asgard.demo.ORDERS",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",
"tasks.max": "2"
}'
Let’s check the status of the connector instance.
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort | grep ElasticsearchSinkConnector
Expected output:
source | source-debezium-orders-01 | RUNNING | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Now let’s inspect the data that is written to Elasticsearch.
curl -s http://localhost:9200/mysql-debezium-asgard.demo.orders/_search \
-H 'content-type: application/json' \
-d '{ "size": 5, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\
jq '.hits.hits[]._source | .id, .CREATE_TS'
Our next step in this exercise is to stream the data that was sourced from the MySQL database to a Neo4j sink. Notice that for this connector, we set tasks.max equal to 2.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-neo4j-orders-01/config \
-d '{
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
"value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
"topics": "mysql-debezium-asgard.demo.ORDERS",
"tasks.max": "2",
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "connect",
"neo4j.topic.cypher.mysql-debezium-asgard.demo.ORDERS": "MERGE (city:city{city: event.delivery_city}) MERGE (customer:customer{id: event.customer_id, delivery_address: event.delivery_address, delivery_city: event.delivery_city, delivery_company: event.delivery_company}) MERGE (vehicle:vehicle{make: event.make, model:event.model}) MERGE (city)<-[:LIVES_IN]-(customer)-[:BOUGHT{order_total_usd:event.order_total_usd,order_id:event.id}]->(vehicle)"
} '
Let’s check the status of the connector instance.
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort | grep Neo4jSinkConnector
Expected output:
source | source-debezium-orders-01 | RUNNING | RUNNING | RUNNING | io.confluent.connect.elasticsearch.Neo4jSinkConnector
In this exercise we:
If you run this demonstration yourself, you need to tear down the environment after doing so to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.
Let’s walk through that tear down process now for this environment.
First, we will shut down the Docker containers and free the resources they are using.
docker-compose down -v
Next we will delete the Kafka topics that were created during the exercise. To start, let’s set the cluster context for the confluent CLI.
confluent kafka cluster list
confluent kafka cluster use <kc-101 cluster ID>
We can now use the list command to identify the topic names we need to delete.
confluent kafka topic list
And now we can delete each of these topics.
confluent kafka topic delete <topic>
Repeat command for each topic listed in the previous step.
This concludes this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Welcome back to another hands-on Kafka Connect exercise. This time, we'll learn how to run and maintain a self-managed connector with Confluent Cloud. Before we get started, there's a bit of preparation that you should take care of to make sure you have all of the tools you'll need to be successful with this exercise. If you don't already have a Confluent Cloud account, make sure you do so now, and also take the time to complete the necessary environment setup steps. As mentioned previously in this course, the rate at which fully managed connectors are being added to Confluent Cloud is pretty impressive, but it is possible that the connector you want to use with your Confluent Cloud cluster isn't yet available. In this scenario, you'll need to run your own Kafka Connect worker that will connect to Confluent Cloud. In this exercise, we'll be doing exactly that. We'll start off by establishing a self-managed Kafka Connect cluster that will run in Docker containers, and will be configured to write to and read from Kafka topics in our kc-101 Confluent Cloud cluster. Next, we'll create a number of Kafka Connect data pipelines. The first pipeline will use a self-managed Debezium MySQL Source connector to stream data from our local MySQL database into a Kafka topic in Confluent Cloud. The second pipeline will use a self-managed Elasticsearch Sink connector to stream data from a Kafka topic in our kc-101 Confluent Cloud cluster to a local instance of Elasticsearch. A third and final pipeline will use a self-managed Neo4j Sink connector to stream data from a Kafka topic in our kc-101 Confluent Cloud cluster to a local instance of Neo4j. Our self-managed Kafka Connect cluster will require the Confluent Cloud Java configuration settings we previously obtained. We'll make these settings available via environment variables using the env.delta file that was created in the exercise setup steps. Before we do this, though, let's review the contents of the Java config file. Observe that the settings include the Confluent Cloud cluster endpoint that is used for the bootstrap.servers property. It also includes values for the cluster API key and secret, which clients need to authenticate with Confluent Cloud. The settings also include the Schema Registry endpoint and API key and secret. These are also needed by the clients. Next, let's review the contents of the env.delta file. As you can see, this file contains a number of export commands that create environment variables set to the value of a corresponding Java client configuration setting. We'll be establishing these variables for our current command shell. As always, we should verify that the previous step was successful. You should see in the command output that the environment variables contained in env.delta are active. Now, let's examine the Docker container definitions being used by the exercise environment. The first Docker container named connect-1 is one of the two Kafka Connect worker nodes we'll be working with. Let's take time to examine some of the configuration settings that are specified in the container's environment settings. On line 15, we see that CONNECT_BOOTSTRAP_SERVERS is set equal to the environment variable $BOOTSTRAP_SERVERS. This is the endpoint address of the kc-101 Confluent Cloud cluster. Line 16 contains the group.id property, which is set to kc-101. This is what tells the connect-1 worker node which Connect cluster it should join upon startup. All other worker nodes with the same group.id value belong to the same Connect cluster. Line 17 through 19 contain three internal topics the worker nodes use to keep in sync with one another. The names for these topics need to be unique for each Connect cluster. Next up on lines 29 through 31, you'll see three more environment variables which set values used by the Connect workers to connect to Schema Registry, which in our case is located in Confluent Cloud. The workers and underlying producers and consumers will use lines 46, 51, and 56 to authenticate with the Confluent Cloud cluster. On lines 64 through 67, we see that the connector plugins that we'll be using during our exercises are being downloaded from Confluent Hub and installed on the connect-1 worker node. The connect-2 worker node container configuration also includes these same plugin installation steps. Connector plugins must be installed on every node in the Connect cluster. Scrolling down, we'll find the environment variables are also used to define the connect-2 worker node. Scrolling down further, you'll see additional Docker containers defined, including MySQL, Elasticsearch, Kibana, Neo4j, and Kafkacat. Let's now start the Kafka Connect cluster as well as the other Docker containers, and, of course, we'll verify the containers are running. And, finally, we should check that the Connect workers are ready, and that the Elasticsearch, Debezium, and Neo4j connector plugins are available. Let's now verify that our self-managed Kafka Connect cluster is using our Confluent Cloud cluster. Observe there is a consumer for each of the internal topics. Next, we'll stream data from our local MySQL database to a Confluent Cloud Kafka topic using the Debezium MySQL source connector. We'll start by viewing a sample of records in the database. These are the records that we'll stream to a Kafka topic in Confluent Cloud, but before we continue, let's start a process to generate additional rows to the MySQL database. When that's done, we can create the MySQL source connector instance, and, as always, it's a good idea to check the status of the connector instance. Once it's up and running, we can observe the records that are being written to the Kafka topic by the MySQL source connector. Before continuing, let's stop the MySQL data generator that we have running. Our next step in this exercise is to stream the data that was sourced from the MySQL database to an Elasticsearch Sink. To start, we'll create an Elasticsearch Sink connector instance. Notice that, for this connector, we set tasks.max equal to two. This is primarily for instructional purposes since the load doesn't really need to be spread out across our two Connect worker nodes. We'll examine how these multiple tasks are distributed in later exercises for the Kafka Connect REST API and troubleshooting modules. Again, we should check the status of the connector instance. With it running, let's inspect the data that is written to Elasticsearch. Our next step in this exercise is to stream the data that was sourced from the MySQL database to a Neo4j Sink. Notice again that, for this connector, we set tasks.max equal to two. Let's check the status of the connector instance again. Everything looks good. All right, let's take a moment to review what we've achieved up until this point. So far in this exercise, we implemented a self-managed Kafka Connect cluster and associated it with a Confluent Cloud cluster. We created a source connector instance that consumed records from a local MySQL database and wrote corresponding records to a Kafka topic in the Confluent Cloud cluster, and we also created two Sink connector instances that consumed records from a Kafka topic in the Confluent Cloud cluster and wrote corresponding records out to a local Elasticsearch instance and a local Neo4j instance. If you're running this demonstration yourself, you'll need to tear down the environment after doing so to avoid unnecessarily accruing cost to the point your promotional credits are exhausted. And with that, now you have everything you need to run a self-managed Kafka Connect cluster with your own Confluent Cloud instance.