Senior Developer Advocate (Presenter)
Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required.
In this tutorial, we will demonstrate features of the Connect REST API using basic command line examples.
Get basic Connect cluster information including the worker version, the commit that it’s on, and its Kafka cluster ID with the following command:
curl http://localhost:8083/
Note that the cluster ID sets this cluster apart from other Connect clusters that may be running a separate set of connectors.
The command below lists the plugins that are installed on the worker. Note that plugins need to be installed first in order to be called at runtime later.
curl -s localhost:8083/connector-plugins
Kafka does ship with a few plugins, but generally you will need to install plugins yourself. The best place to get them is Confluent Hub, where you will find a large number of plugins and a command line tool to install them. Recall that the Docker containers for our two Connect workers included commands to download and install four connector plugins from Confluent Hub.
It is a bit difficult to spot the different plugins in the command output. We can run the command again though and pipe the output to the jq command to get it in a more easily readable form.
curl -s localhost:8083/connector-plugins | jq '.'
Now it is much easier to see the details for each of the available plugins. These plugins need to be installed on all workers in the Connect cluster so that if a connector instance or task is moved to a worker due to a rebalance, the plugin is available to run it.
To create a connector instance, you PUT or POST a JSON file with the connector’s configuration to a REST endpoint on your Connect worker. PUT is somewhat easier because it will create the connector if it doesn’t exist, or update it if it already exists. If it already exists and there’s no update to make, it won’t error—so PUT is the idempotent way of updating a connector.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-orders-00/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
"value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.orders",
"database.history.kafka.bootstrap.servers": "'$BOOTSTRAP_SERVERS'",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.kafka.topic": "dbhistory.demo",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "3",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
}'
Use the following command to list of all extant connectors:
curl -s -X GET "http://localhost:8083/connectors/"
Inspect the config for a given connector as follows:
curl -i -X GET -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-orders-00/config
You can also look at a connector’s status. While the config command shows a connector’s static configuration, the status shows the connector as a runtime entity:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state, .value.info.config."connector.class"] |join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
If something is wrong in your setup and you don’t think a config change would help, or if you simply don’t need a connector to run anymore, you can delete it by name:
curl -s -X DELETE "http://localhost:8083/connectors/sink-elastic-orders-00"
Or you can make a nifty interactive delete list with the tool
by piping the connector list peco
through it, finally stdout
-ing to a xarg
call to the delete API:cURL
curl -s "http://localhost:8083/connectors" | \
jq '.[]' | \
peco | \
xargs -I{connector_name} curl -s -X DELETE "http://localhost:8083/connectors/"\{connector_name\}
It returns an interactive list of connector instances. To delete one, arrow down to highlight it and press enter. When you are done, just press Ctrl-C to end the interactive list.
As mentioned above, if there’s a connector to update, you can use PUT
to amend the configuration (see Create a Connector Instance above). Because PUT
is used to both create and update connectors, it’s the standard command that you should use most of the time (which also means that you don’t have to completely rewrite your configs).
The following command returns the connector status:
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/status" | jq '.'
If your connector fails, the details of the failure belong to the task. So to inspect the problem, you’ll need to find the stack trace for the task. The task is the entity that is actually running the connector and converter code, so the state for the stack trace lives in it.
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/status" | jq '.'
If after inspecting a task, you have determined that it has failed and you have fixed the reason for the failure (perhaps restarted a database), you can restart the connector with the following:
curl -s -X POST "http://localhost:8083/connectors/source-debezium-orders-00/restart"
Keep in mind though that restarting the connector doesn’t restart all of its tasks. You will also need to restart the failed task and then get its status again as follows:
curl -s -X POST "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/restart"
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/status" | jq '.'
Unlike restarting, pausing a connector does pause its tasks. This happens asynchronously, though, so when you pause a connector, you can’t rely on it pausing all of its tasks at exactly the same time. The tasks are running in a thread pool, so there’s no fancy mechanism to make this happen simultaneously.
A connector and its tasks can be paused as follows:
curl -s -X PUT "http://localhost:8083/connectors/source-debezium-orders-00/pause"
Just as easily, a connector and its tasks can be resumed:
curl -s -X PUT "http://localhost:8083/connectors/source-debezium-orders-00/resume"
A convenient way to display all of a connector’s tasks at once is as follows:
curl -s -X GET "http://localhost:8083/connectors/sink-neo4j-orders-00/tasks" | jq '.'
This information is similar to what you can get from other APIs, but it is broken down by task, and configs for each are shown.
As of Apache Kafka 2.5, it is possible to get a list of topics used by a connector:
curl -s -X GET "http://localhost:8083/connectors/source-debezium-orders-00/topics" | jq '.'
This shows the topics that a connector is consuming from or producing to. This may not be particularly useful for connectors that are consuming from or producing to a single topic. However, some developers, for example, use regular expressions for topic names in Connect, so this is a major benefit in situations where topic names are derived computationally.
This could also be useful with a source connector that is using SMTs to dynamically change the topic names to which it is producing.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.