Sr. Director, Developer Advocacy (Presenter)
Principal Developer Advocate (Author)
This tutorial will demonstrate the best features of the Connect REST API using basic command line examples. For more in-depth usage, refer to the official documentation.
Get basic Connect cluster information about the worker, the version, the commit that it’s on, and its Kafka cluster ID with the following command:
curl http://localhost:8083/
Note that the cluster ID sets this cluster apart from other Connect clusters that may be running a separate set of connectors.
The command below lists the plugins that are installed on the worker. Note that plugins need to be installed first in order to be called at runtime later.
curl -s localhost:8083/connector-plugins
Kafka does ship with a few plugins, but generally you will need to install plugins yourself. The best place to get them is Confluent Hub, where you will find a large number of plugins and a command line tool to install them.
To create a connector, you PUT
or POST
a JSON file with the connector’s configuration to a REST endpoint on your Connect worker. PUT
is somewhat easier because it will create the connector if it doesn’t exist, or update it if it already exists. If it already exists and there’s no update to make, it won’t error—so PUT
is the idempotent way of updating a connector.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-orders-01/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.orders",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "dbhistory.demo",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms: "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
Use the following command to list all extant connectors:
curl -s -XGET "http://localhost:8083/connectors/"
Inspect the config for a given connector as follows:
curl -i -X GET -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-orders-00/config
You can also look at a connector’s status. While the config inspection shows a connector’s static configuration, the status shows us the connector as a runtime entity:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state, .value.info.config."connector.class"] |join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
If something is wrong in your setup and you don’t think a config change would help, or if you simply don’t need a connector to run anymore, you can delete it by name:
curl -s -XDELETE "http://localhost:8083/connectors/sink-elastic-orders-00"
Or you can make a nifty interactive delete list with the tool peco
by piping the connector list stdout
through it, finally xarg-ing
to a cURL
call to the delete API:
curl -s "http://localhost:8083/connectors" | \
jq '.[]' | \
peco | \
xargs -I{connector_name} curl -s -XDELETE "http://localhost:8083/connectors/"\{connector_name\}
As mentioned above, if there’s a connector to update, you can use PUT
to amend the configuration (see Creating a Connector above). Because PUT
is used to both create and update connectors, it’s the standard command that you should use most of the time (which also means that you don’t have to completely rewrite your configs).
Remember that if your connector fails, the details of the failure belong to the task:
curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/status" | jq '.'
{
"name": "source-debezium-orders-01",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "kafka-connect:8083",
So to inspect the problem, you’ll need to find the stack trace for the task:
curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/tasks/0/status" | jq '.'
The task is the entity that is actually running the connector and converter code, so the state for the stack trace lives in it.
If after inspecting a task, you have determined that it has failed and you have fixed the reason for the failure (perhaps restarted a database), you can restart the connector with the following:
curl -s -XPOST "http://localhost:8083/connectors/source-debezium-orders-01/restart"
But keep in mind that restarting the connector doesn’t restart all of its tasks. So you can restart the failed task and then get its status again as follows:
curl -s -XPOST "http://localhost:8083/connectors/source-debezium-orders-01/tasks/0/restart"
curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/tasks/0/status" | jq '.'
Unlike restarting, pausing a connector does pause its tasks. This happens asynchronously, though, so when you pause a connector, you can’t rely on it pausing all of its tasks at exactly the same time. The tasks are running in a thread pool, so there’s no fancy mechanism to make this happen simultaneously.
A connector and its tasks can be paused as follows:
curl -s -XPUT "http://localhost:8083/connectors/source-debezium-orders-01/pause"
Just as easily, a connector and its tasks can be resumed:
curl -s -XPUT "http://localhost:8083/connectors/source-debezium-orders-01/resume"
A convenient way to display all of a connector’s tasks at once is as follows:
curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/tasks" | jq '.'
This information is similar to what you can get from other APIs, but it is broken down by task, and configs for each are shown.
As of Apache Kafka 2.5, it is possible to get a list of topics used by a connector:
curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/topics" | jq '.'
This shows the topics that a connector is consuming from or producing to. This may not be particularly useful for connectors that are consuming from or producing to a single topic. However, some developers, for example, use regular expressions for topic names in Connect, so this is a major benefit in situations where topic names are derived computationally.
This could also be useful with a source connector that is using SMTs to dynamically change the topic names to which it is producing.
Since Apache Kafka 2.3, it has been possible to pinpoint specific logs with heightened logging levels. For example, you may want to set an element to TRACE logging, without setting everything to TRACE and without restarting the worker:
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/io.debezium.connector.mysql.BinlogReader \
-d '{"level": "TRACE"}’ \
| jq '.'
This allows you to iterate more quickly when troubleshooting, and for more subtle problems, you won’t lose context by restarting the worker.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.