Get Started Free
course: Kafka Connect 101

Kafka Connect’s REST API

6 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required.

In this tutorial, we will demonstrate features of the Connect REST API using basic command line examples.

Getting Basic Connect Cluster Information

Get basic Connect cluster information including the worker version, the commit that it’s on, and its Kafka cluster ID with the following command:

curl http://localhost:8083/

Note that the cluster ID sets this cluster apart from other Connect clusters that may be running a separate set of connectors.

Listing Installed Plugins

The command below lists the plugins that are installed on the worker. Note that plugins need to be installed first in order to be called at runtime later.

curl -s localhost:8083/connector-plugins

Kafka does ship with a few plugins, but generally you will need to install plugins yourself. The best place to get them is Confluent Hub, where you will find a large number of plugins and a command line tool to install them. Recall that the Docker containers for our two Connect workers included commands to download and install four connector plugins from Confluent Hub.

Formatting the Result of the Installed Plugin List

It is a bit difficult to spot the different plugins in the command output. We can run the command again though and pipe the output to the jq command to get it in a more easily readable form.

curl -s localhost:8083/connector-plugins | jq '.'

Now it is much easier to see the details for each of the available plugins. These plugins need to be installed on all workers in the Connect cluster so that if a connector instance or task is moved to a worker due to a rebalance, the plugin is available to run it.

Create a Connector Instance

To create a connector instance, you PUT or POST a JSON file with the connector’s configuration to a REST endpoint on your Connect worker. PUT is somewhat easier because it will create the connector if it doesn’t exist, or update it if it already exists. If it already exists and there’s no update to make, it won’t error—so PUT is the idempotent way of updating a connector.

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-debezium-orders-00/config \
    -d '{
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
            "value.converter.schemas.enable": "true",
            "value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
            "value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
            "value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
            "database.hostname": "mysql",
            "database.port": "3306",
            "database.user": "debezium",
            "database.password": "dbz",
            "database.server.id": "42",
            "database.server.name": "asgard",
            "table.whitelist": "demo.orders",
            "database.history.kafka.bootstrap.servers": "'$BOOTSTRAP_SERVERS'",
            "database.history.consumer.security.protocol": "SASL_SSL",
            "database.history.consumer.sasl.mechanism": "PLAIN",
            "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
            "database.history.producer.security.protocol": "SASL_SSL",
            "database.history.producer.sasl.mechanism": "PLAIN",
            "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
            "database.history.kafka.topic": "dbhistory.demo",
            "topic.creation.default.replication.factor": "3",
            "topic.creation.default.partitions": "3",
            "decimal.handling.mode": "double",
            "include.schema.changes": "true",
            "transforms": "unwrap,addTopicPrefix",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.addTopicPrefix.regex":"(.*)",
            "transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
    }'

List Connector Instances

Use the following command to list of all extant connectors:

curl -s -XGET "http://localhost:8083/connectors/"

Inspect Config and Status for a Connector

Inspect the config for a given connector as follows:

curl -i -X GET -H  "Content-Type:application/json" \
       http://localhost:8083/connectors/sink-elastic-orders-00/config

You can also look at a connector’s status. While the config command shows a connector’s static configuration, the status shows the connector as a runtime entity:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
	jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state, .value.info.config."connector.class"] |join(":|:")' | \
	column -s : -t| sed 's/\"//g'| sort

Delete a Connector

If something is wrong in your setup and you don’t think a config change would help, or if you simply don’t need a connector to run anymore, you can delete it by name:

curl -s -XDELETE "http://localhost:8083/connectors/sink-elastic-orders-00"

Or you can make a nifty interactive delete list with the tool peco by piping the connector list stdout through it, finally xarg-ing to a cURL call to the delete API:

curl -s "http://localhost:8083/connectors" | \
        jq '.[]' | \
      peco | \
    xargs -I{connector_name} curl -s -XDELETE "http://localhost:8083/connectors/"\{connector_name\}

It returns an interactive list of connector instances. To delete one, arrow down to highlight it and press enter. When you are done, just press Ctrl-C to end the interactive list.

Update a Connector

As mentioned above, if there’s a connector to update, you can use PUT to amend the configuration (see Create a Connector Instance above). Because PUT is used to both create and update connectors, it’s the standard command that you should use most of the time (which also means that you don’t have to completely rewrite your configs).

Inspect Task Details

The following command returns the connector status:

curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-00/status" | jq '.'

If your connector fails, the details of the failure belong to the task. So to inspect the problem, you’ll need to find the stack trace for the task. The task is the entity that is actually running the connector and converter code, so the state for the stack trace lives in it.

curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/status" | jq '.'

Restart the Connector and Tasks

If after inspecting a task, you have determined that it has failed and you have fixed the reason for the failure (perhaps restarted a database), you can restart the connector with the following:

curl -s -XPOST "http://localhost:8083/connectors/source-debezium-orders-00/restart"

Keep in mind though that restarting the connector doesn’t restart all of its tasks. You will also need to restart the failed task and then get its status again as follows:

curl -s -XPOST "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/restart"
curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/status" | jq '.'

Pause and Resume a Connector

Unlike restarting, pausing a connector does pause its tasks. This happens asynchronously, though, so when you pause a connector, you can’t rely on it pausing all of its tasks at exactly the same time. The tasks are running in a thread pool, so there’s no fancy mechanism to make this happen simultaneously.

A connector and its tasks can be paused as follows:

curl -s -XPUT "http://localhost:8083/connectors/source-debezium-orders-00/pause"

Just as easily, a connector and its tasks can be resumed:

curl -s -XPUT "http://localhost:8083/connectors/source-debezium-orders-00/resume"

Display All of a Connector’s Tasks

A convenient way to display all of a connector’s tasks at once is as follows:

curl -s -XGET "http://localhost:8083/connectors/sink-neo4j-orders-00/tasks" | jq '.'

This information is similar to what you can get from other APIs, but it is broken down by task, and configs for each are shown.

Get a List of Topics Used by a Connector

As of Apache Kafka 2.5, it is possible to get a list of topics used by a connector:

curl -s -XGET "http://localhost:8083/connectors/source-debezium-orders-00/topics" | jq '.'

This shows the topics that a connector is consuming from or producing to. This may not be particularly useful for connectors that are consuming from or producing to a single topic. However, some developers, for example, use regular expressions for topic names in Connect, so this is a major benefit in situations where topic names are derived computationally.

This could also be useful with a source connector that is using SMTs to dynamically change the topic names to which it is producing.

Use the promo code 101CONNECT to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.