Course: Kafka Connect 101

REST API

11 min
Tim BerglundSr. Director, Developer Advocacy (Course Presenter)
Robin MoffattStaff Developer Advocate (Course Author)

Kafka Connect REST API

This tutorial will demonstrate the best features of the Connect REST API using basic command line examples. For more in-depth usage, refer to the official documentation.

Getting Basic Connect Cluster Information

Get basic Connect cluster information about the worker, the version, the commit that it’s on, and its Kafka cluster ID with the following command:

  curl http://localhost:8083/

Note that the cluster ID sets this cluster apart from other Connect clusters that may be running a separate set of connectors.

Listing Installed Plugins

The command below lists the plugins that are installed on the worker. Note that plugins need to be installed first in order to be called at runtime later.

  curl -s localhost:8083/connector-plugins

Kafka does ship with a few plugins, but generally you will need to install plugins yourself. The best place to get them is Confluent Hub, where you will find a large number of plugins and a command line tool to install them.

Creating a Connector

To create a connector, you PUT or POST a JSON file with the connector’s configuration to a REST endpoint on your Connect worker. PUT is somewhat easier because it will create the connector if it doesn’t exist, or update it if it already exists. If it already exists and there’s no update to make, it won’t error—so PUT is the idempotent way of updating a connector.

 curl -i -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/connectors/source-debezium-orders-01/config \
    -d '{
	"connector.class": "io.debezium.connector.mysql.MySqlConnector",
	"database.hostname": "mysql",
	"database.port": "3306",
	"database.user": "debezium",
	"database.password": "dbz",
	"database.server.id": "42",
	"database.server.name": "asgard",
	"table.whitelist": "demo.orders",
	"database.history.kafka.bootstrap.servers": "kafka:29092",
	"database.history.kafka.topic": "dbhistory.demo",
	"decimal.handling.mode": "double",
	"include.schema.changes": "true",
	"transforms: "unwrap,addTopicPrefix",
	"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
	"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
	"transforms.addTopicPrefix.regex":"(.*)",
	"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"

Listing the Connectors

Use the following command to list all extant connectors:

 curl -s -XGET "http://localhost:8083/connectors/"

Inspecting Config and Status for a Connector

Inspect the config for a given connector as follows:

 curl -i -X PUT -H  "Content-Type:application/json" \
       http://localhost:8083/connectors/sink-elastic-orders-00/config \
       -d ‘{
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "topics": "mysql-debezium-asgard.demo.ORDERS",
            "connection.url": "http://elasticsearch:9200",
            "type.name": "type.name=kafkaconnect",
            "key.ignore": "true",
            "schema.ignore": "true"
            }’

You can also look at a connector’s status. While the config inspection shows a connector’s static configuration, the status shows us the connector as a runtime entity:

 curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
	jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state, .value.info.config."connector.class"] |join(":|:")' | \
	column -s : -t| sed 's/\"//g'| sort

Deleting a Connector

If something is wrong in your setup and you don’t think a config change would help, or if you simply don’t need a connector to run anymore, you can delete it by name:

 curl -s -XDELETE "http://localhost:8083/connectors/sink-elastic-orders-00"

Or you can make a nifty interactive delete list with the tool peco by piping the connector list stdout through it, finally xarg-ing to a cURL call to the delete API:

 curl -s "http://localhost:8083/connectors" | \
		      jq '.[]' | \
	      	peco | \
	  	xargs -I{connector_name} curl -s -XDELETE "http://localhost:8083/connectors/"\{connector_name\}

Updating a Connector

As mentioned above, if there’s a connector to update, you can use PUT to amend the configuration (see Creating a Connector above). Because PUT is used to both create and update connectors, it’s the standard command that you should use most of the time (which also means that you don’t have to completely rewrite your configs).

Inspecting Task Details

Remember that if your connector fails, the details of the failure belong to the task:

 curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/status" | jq '.'
{
  "name": "source-debezium-orders-01",
  "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8083"
},
  "tasks": [
    {
     "id": 0,
     "state": "FAILED",
     "worker_id": "kafka-connect:8083",

So to inspect the problem, you’ll need to find the stack trace for the task:

 curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/tasks/0/status" | jq '.'

The task is the entity that is actually running the connector and converter code, so the state for the stack trace lives in it.

Restarting the Connector and Tasks

If after inspecting a task, you have determined that it has failed and you have fixed the reason for the failure (perhaps restarted a database), you can restart the connector with the following:

 curl -s -XPOST "http://localhost:8083/connectors/source-debezium-orders-01/restart"

But keep in mind that restarting the connector doesn’t restart all of its tasks. So you can restart the failed task and then get its status again as follows:

 curl -s -XPOST "http://localhost:8083/connectors/source-debezium-orders-01/tasks/0/restart"

 curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/tasks/0/status" | jq '.'

Pausing and Resuming a Connector

Unlike restarting, pausing a connector does pause its tasks. This happens asynchronously, though, so when you pause a connector, you can’t rely on it pausing all of its tasks at exactly the same time. The tasks are running in a thread pool, so there’s no fancy mechanism to make this happen simultaneously.

A connector and its tasks can be paused as follows:

 curl -s -XPUT "http://localhost:8083/connectors/source-debezium-orders-01/pause"

Just as easily, a connector and its tasks can be resumed:

 curl -s -XPUT "http://localhost:8083/connectors/source-debezium-orders-01/resume"

Display All of a Connector’s Tasks

A convenient way to display all of a connector’s tasks at once is as follows:

 curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/tasks" | jq '.'

This information is similar to what you can get from other APIs, but it is broken down by task, and configs for each are shown.

Getting a List of Topics Used by a Connector

As of Apache Kafka 2.5, it is possible to get a list of topics used by a connector:

 curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-01/topics" | jq '.'

This shows the topics that a connector is consuming from or producing to. This may not be particularly useful for connectors that are consuming from or producing to a single topic. However, some developers, for example, use regular expressions for topic names in Connect, so this is a major benefit in situations where topic names are derived computationally.

This could also be useful with a source connector that is using SMTs to dynamically change the topic names to which it is producing.

Dynamically Changing Log Level

Since Apache Kafka 2.3, it has been possible to pinpoint specific logs with heightened logging levels. For example, you may want to set an element to TRACE logging, without setting everything to TRACE and without restarting the worker:

 curl -s -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/admin/loggers/io.debezium.connector.mysql.BinlogReader \
    -d '{"level": "TRACE"}’  \
    | jq '.'

This allows you to iterate more quickly when troubleshooting, and for more subtle problems, you won’t lose context by restarting the worker.

Use the promo code CONNECT101 to receive $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.