Senior Developer Advocate (Presenter)
Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required.
In this tutorial, we will demonstrate features of the Connect REST API using basic command line examples.
Get basic Connect cluster information including the worker version, the commit that it’s on, and its Kafka cluster ID with the following command:
curl http://localhost:8083/
Note that the cluster ID sets this cluster apart from other Connect clusters that may be running a separate set of connectors.
The command below lists the plugins that are installed on the worker. Note that plugins need to be installed first in order to be called at runtime later.
curl -s localhost:8083/connector-plugins
Kafka does ship with a few plugins, but generally you will need to install plugins yourself. The best place to get them is Confluent Hub, where you will find a large number of plugins and a command line tool to install them. Recall that the Docker containers for our two Connect workers included commands to download and install four connector plugins from Confluent Hub.
It is a bit difficult to spot the different plugins in the command output. We can run the command again though and pipe the output to the jq command to get it in a more easily readable form.
curl -s localhost:8083/connector-plugins | jq '.'
Now it is much easier to see the details for each of the available plugins. These plugins need to be installed on all workers in the Connect cluster so that if a connector instance or task is moved to a worker due to a rebalance, the plugin is available to run it.
To create a connector instance, you PUT or POST a JSON file with the connector’s configuration to a REST endpoint on your Connect worker. PUT is somewhat easier because it will create the connector if it doesn’t exist, or update it if it already exists. If it already exists and there’s no update to make, it won’t error—so PUT is the idempotent way of updating a connector.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-orders-00/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
"value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.orders",
"database.history.kafka.bootstrap.servers": "'$BOOTSTRAP_SERVERS'",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.kafka.topic": "dbhistory.demo",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "3",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
}'
Use the following command to list of all extant connectors:
curl -s -X GET "http://localhost:8083/connectors/"
Inspect the config for a given connector as follows:
curl -i -X GET -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-orders-00/config
You can also look at a connector’s status. While the config command shows a connector’s static configuration, the status shows the connector as a runtime entity:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state, .value.info.config."connector.class"] |join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
If something is wrong in your setup and you don’t think a config change would help, or if you simply don’t need a connector to run anymore, you can delete it by name:
curl -s -X DELETE "http://localhost:8083/connectors/sink-elastic-orders-00"
Or you can make a nifty interactive delete list with the tool peco by piping the connector list stdout through it, finally xarg-ing to a cURL call to the delete API:
curl -s "http://localhost:8083/connectors" | \
jq '.[]' | \
peco | \
xargs -I{connector_name} curl -s -X DELETE "http://localhost:8083/connectors/"\{connector_name\}
It returns an interactive list of connector instances. To delete one, arrow down to highlight it and press enter. When you are done, just press Ctrl-C to end the interactive list.
As mentioned above, if there’s a connector to update, you can use PUT to amend the configuration (see Create a Connector Instance above). Because PUT is used to both create and update connectors, it’s the standard command that you should use most of the time (which also means that you don’t have to completely rewrite your configs).
The following command returns the connector status:
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/status" | jq '.'
If your connector fails, the details of the failure belong to the task. So to inspect the problem, you’ll need to find the stack trace for the task. The task is the entity that is actually running the connector and converter code, so the state for the stack trace lives in it.
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/status" | jq '.'
If after inspecting a task, you have determined that it has failed and you have fixed the reason for the failure (perhaps restarted a database), you can restart the connector with the following:
curl -s -X POST "http://localhost:8083/connectors/source-debezium-orders-00/restart"
Keep in mind though that restarting the connector doesn’t restart all of its tasks. You will also need to restart the failed task and then get its status again as follows:
curl -s -X POST "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/restart"
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/status" | jq '.'
Unlike restarting, pausing a connector does pause its tasks. This happens asynchronously, though, so when you pause a connector, you can’t rely on it pausing all of its tasks at exactly the same time. The tasks are running in a thread pool, so there’s no fancy mechanism to make this happen simultaneously.
A connector and its tasks can be paused as follows:
curl -s -X PUT "http://localhost:8083/connectors/source-debezium-orders-00/pause"
Just as easily, a connector and its tasks can be resumed:
curl -s -X PUT "http://localhost:8083/connectors/source-debezium-orders-00/resume"
A convenient way to display all of a connector’s tasks at once is as follows:
curl -s -X GET "http://localhost:8083/connectors/sink-neo4j-orders-00/tasks" | jq '.'
This information is similar to what you can get from other APIs, but it is broken down by task, and configs for each are shown.
As of Apache Kafka 2.5, it is possible to get a list of topics used by a connector:
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/topics" | jq '.'
This shows the topics that a connector is consuming from or producing to. This may not be particularly useful for connectors that are consuming from or producing to a single topic. However, some developers, for example, use regular expressions for topic names in Connect, so this is a major benefit in situations where topic names are derived computationally.
This could also be useful with a source connector that is using SMTs to dynamically change the topic names to which it is producing.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Welcome back to another Kafka Connect exercise. In this hands-on demonstration, we'll walk through features of the Connect REST API using basic command line examples. Let's start by running the command that gets basic Connect cluster information, including the worker version, the commit that it's on, and it's Kafka cluster ID. Note that the cluster ID sets this cluster apart from other Connect clusters that may be running a separate set of connectors. Next, let's run the command that lists the plugins that are installed on the worker. Note that plugins need to be installed first in order to be called at runtime later. Kafka does ship with a few plugins, but generally you'll need to install plug-ins yourself. The best place to get them is Confluent Hub, where you'll find a large number of plug-ins and a command line tool to install them. Recall that the docker containers for our two Connect workers included commands to download and install four connector plug-ins from Confluent Hub. It's a bit difficult to spot the different plugins on the command line output. We can run the command again though and pipe the output to the jq command to get it in a more easily readable form. Now it's much easier to see the details for each of the available plug-ins. These plug-ins need to be installed on all workers in the Connect cluster so that if a connector instance or task is moved to a worker due to a rebalance, the plugin is available to run it. Next, let's create a connector instance. To do so, we PUT or POST a JSON file with the connector's configuration to a REST endpoint on our Connect worker. PUT is somewhat easier, because it will create the connector if it doesn't already exist or update it, if it does. If it already exists and there's no update to make, it won't error out. So PUT is the idempotent way of updating a connector. Let's now run the command that returns a list of all existing connectors. We also have a command that will allow us to inspect the config for a given connector as follows. And we can also look at a connector status. While the config command shows the connector's static configuration, the status shows us the connector as a runtime entity. If something is wrong in your setup and you don't think a config change would help, or if you simply don't need a connector to run anymore, you can delete it by name. Let's verify the delete request succeeded. While we're on the subject of deleting connectors, you can make a nifty interactive delete list with the tool peco by piping the connector list stdout through it. Finally, xarg-ing to a cURL call to the delete API. Remember though that if your connector fails, the details of the failure belong to the task. So to inspect the problem we'll need to find the stack trace for the task. The task is the entity that is actually running the connector and converter code, so the state for the stack trace lives within it. Here we just see the task status of RUNNING. But if the task had failed, we would see the stack trace details. We'll go through these steps in the troubleshooting module for a failed connector. And when we do so, the stack trace details will be included. For now, let's talk about restarting connectors and tasks. If after inspecting a task you have determined that it has failed and you've fixed the reason for the failure, perhaps restarted a database, you could restart the connector with this command. Keep in mind though, that restarting the connector doesn't restart all of its tasks. You will also need to restart the failed task using this command. Unlike restarting, pausing a connector does pause its tasks. This happens asynchronously though, so when you pause a connector, you can't rely on it pausing all of its tasks at exactly the same time. The tasks are running in a thread pool, so there's no fancy mechanism to make this happen simultaneously. Let's pause one of our connectors now. And let's verify that both the connector and task are paused using the status command. And that's confirmed. Let's now resume the connector and its tasks. And, again, let's verify both our once again running. Moving on, we also have a command to display all of the connector's tasks. This information is similar to what you can get from other APIs, but it's broken down by task and configs for each are shown. The last command we'll look at is one that returns a list of topics used by a connector. This became available as of Apache Kafka 2.5. This shows the topics that a connector is consuming from and producing to. This may not be particularly useful for connectors that are consuming from or producing to a single topic. However, some developers, for example, use regular expressions for topic names in Connect. So this is a major benefit in situations where topic names are derived computationally. And that's it, we've completed our tour of the Kafka Connect REST API.