Build and run a custom connector on Confluent Cloud

Question:

How do I write, package, and run a custom connector on Confluent Cloud?

Edit this page

Example use case:

While Confluent Cloud offers many pre-built managed connectors, you may also upload and run custom connectors on Confluent Cloud — either an existing open source connector that you'd like to use, or a connector that you've written. In this tutorial, we'll write a simple source connector plugin, package it so that it can be uploaded to Confluent Cloud, and then run the connector. As a developer, you want to write code. Let Confluent Cloud do the rest.

Hands-on code example:

New to Confluent Cloud? Get started here.

Run it

Initialize the project

1

To get started, make a new directory anywhere you’d like for this project:

mkdir -p custom-connector/src/main/java/io/confluent/developer/connect/ && cd custom-connector

Provision your Kafka cluster

2

Important: In order to run a custom connector, your Kafka cluster must reside in a supported cloud provider and region.

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.

Take me to Confluent Cloud
  1. After you log in to Confluent Cloud, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Develop the custom connector

3

We’ll start by developing a simple source connector that emits numeric data to a Kafka topic.

We’ll build the connector with Maven, so ensure that you have it installed and on your path. Before proceeding, verify that mvn -version succeeds when you run it on the command line.

First, create the following pom.xml file in the custom-connector directory:

<?xml version="1.0" encoding="UTF-8"?>
<!--~
  ~ Copyright 2023 Confluent Inc.
  ~
  ~ Licensed under the Apache License, Version 2.0 (the "License");
  ~ you may not use this file except in compliance with the License.
  ~ You may obtain a copy of the License at
  ~
  ~ http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  ~-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>io.confluent.developer</groupId>
    <artifactId>kafka-connect-counter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <name>kafka-connect-counter</name>

    <description>
        A dummy Kafka Connect source connector that emits events containing incrementing numbers
    </description>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>3.4.0</version>
        </dependency>

        <!-- logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.7</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-reload4j</artifactId>
            <version>2.0.7</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>io.confluent</groupId>
                <version>0.11.1</version>
                <artifactId>kafka-connect-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>kafka-connect</goal>
                        </goals>
                        <configuration>
                            <title>Counter Kafka Connector</title>
                            <description>Demo connector that emits events with incrementing long values</description>
                            <ownerUsername>confluentinc</ownerUsername>
                            <ownerType>organization</ownerType>
                            <ownerName>Confluent, Inc.</ownerName>
                            <ownerUrl>https://confluent.io/</ownerUrl>
                            <componentTypes>
                                <componentType>source</componentType>
                            </componentTypes>
                            <confluentControlCenterIntegration>true
                            </confluentControlCenterIntegration>
                            <singleMessageTransforms>false
                            </singleMessageTransforms>
                            <supportedEncodings>UTF-8</supportedEncodings>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Most of the POM is boilerplate, but notice two important sections that any custom connector should have:

  1. A dependency on the Kafka Connect API, connect-api

  2. The kafka-connect-maven-plugin plugin, which does the heavy lifting to build the connector zip following this spec. This lets us simply run mvn package later in this tutorial in order to build the plugin zip that we will upload to Confluent Cloud.

Now we’ll create the three classes that comprise our connector: a configuration class, the connector class that most notably builds connector task configurations, and the source task class that generates records.

Go ahead and create the following file at src/main/java/io/confluent/developer/connect/CounterConnectorConfig.java. This class defines the two configuration properties for our connector: the topic to write to (kafka.topic), and the number of milliseconds to wait between emitting events (interval.ms).

package io.confluent.developer.connect;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;

import java.util.Map;

public class CounterConnectorConfig extends AbstractConfig {

  public static final String KAFKA_TOPIC_CONF = "kafka.topic";
  private static final String KAFKA_TOPIC_DOC = "Topic to write to";
  public static final String INTERVAL_CONF = "interval.ms";
  private static final String INTERVAL_DOC = "Interval between messages (ms)";

  public CounterConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
    super(config, parsedConfig);
  }

  public CounterConnectorConfig(Map<String, String> parsedConfig) {
    this(conf(), parsedConfig);
  }

  public static ConfigDef conf() {
    return new ConfigDef()
        .define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
        .define(INTERVAL_CONF, Type.LONG, 1_000L, Importance.HIGH, INTERVAL_DOC);
  }

  public String getKafkaTopic() {
    return this.getString(KAFKA_TOPIC_CONF);
  }

  public Long getInterval() {
    return this.getLong(INTERVAL_CONF);
  }
}

Next, create the following file at src/main/java/io/confluent/developer/connect/CounterConnector.java. You can think of this class as the glue between the connector configuration and the task implementation. We’ll provide the fully qualified name of this class (io.confluent.developer.connect.CounterConnector) later in this tutorial when adding the connector plugin to Confluent Cloud.

package io.confluent.developer.connect;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CounterConnector extends SourceConnector {

  private Map<String, String> props;

  @Override
  public String version() {
    return CounterConnector.class.getPackage().getImplementationVersion();
  }

  @Override
  public void start(Map<String, String> props) {
    this.props = props;
  }

  @Override
  public void stop() {
  }

  @Override
  public Class<? extends Task> taskClass() {
    return CounterSourceTask.class;
  }

  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    List<Map<String, String>> taskConfigs = new ArrayList<>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> taskConfig = new HashMap<>(this.props);
      taskConfig.put(CounterSourceTask.TASK_ID, Integer.toString(i));
      taskConfigs.add(taskConfig);
    }
    return taskConfigs;
  }

  @Override
  public ConfigDef config() {
    return CounterConnectorConfig.conf();
  }
}

Finally, create the following file at src/main/java/io/confluent/developer/connect/CounterSourceTask.java. The poll() method in this class generates our numeric events.

package io.confluent.developer.connect;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class CounterSourceTask extends SourceTask {

  public static final String TASK_ID = "task.id";
  public static final String CURRENT_ITERATION = "current.iteration";

  private CounterConnectorConfig config;
  private String topic;
  private long interval;
  private long count = 0L;
  private int taskId;
  private Map sourcePartition;

  @Override
  public String version() {
    return CounterSourceTask.class.getPackage().getImplementationVersion();
  }

  @Override
  public void start(Map<String, String> props) {
    config = new CounterConnectorConfig(props);
    topic = config.getKafkaTopic();
    interval = config.getInterval();
    taskId = Integer.parseInt(props.get(TASK_ID));
    sourcePartition = Collections.singletonMap(TASK_ID, taskId);

    Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition);
    if (offset != null) {
      // the offset contains our next state, so restore it as is
      count = ((Long) offset.get(CURRENT_ITERATION));
    }
  }

  @Override
  public List<SourceRecord> poll() throws InterruptedException {

    if (interval > 0) {
      try {
        Thread.sleep(interval);
      } catch (InterruptedException e) {
        Thread.interrupted();
        return null;
      }
    }

    Map sourceOffset = Collections.singletonMap(CURRENT_ITERATION, count + 1);

    final List<SourceRecord> records = Collections.singletonList(new SourceRecord(
        sourcePartition,
        sourceOffset,
        topic,
        null,
        Schema.INT64_SCHEMA,
        count
    ));
    count++;
    return records;
  }

  @Override
  public void stop() {
  }
}

Package the custom connector

4

In your terminal, run the following command. You can ignore the warnings generated by the kafka-connect-maven-plugin.

mvn package

This generates the connector zip in the target/components/packages/ directory.

Add the custom connector plugin to Confluent Cloud

5

Now we’ll add the connector plugin to Confluent Cloud. We’re not actually running the connector in this step; we’re just uploading the plugin so that we can run the connector in the next section.

Go ahead and create a Kafka cluster in the Confluent Cloud Console if you didn’t already do so above. Your Kafka cluster must reside in a supported cloud provider and region in order for you to be able to upload the plugin to Confluent Cloud.

On the cluster homepage, select Connectors from the lefthand navigation, and then click the Add plugin button on the top right.

Add Custom Connector Plugin

Fill in the form as follows:

  1. Name: Counter Source

  2. Description: Source connector that emits incrementing numbers

  3. Connector class: the class that extends Kafka Connect’s SourceConnector class, i.e., io.confluent.developer.connect.CounterConnector

  4. Connector type: Select Source

  5. Click Select connector archive and choose the zip file built in the previous step

  6. Sensitive properties: leave this section alone since the connector doesn’t make use of any sensitive properties like credentials to connect to an external system

  7. Review the custom connector disclaimer and check the box agreeing that you are responsible for the code uploaded to Confluent Cloud

  8. Click Submit!

Run the custom connector on Confluent Cloud

6

Let’s now run the connector.

  1. On the cluster homepage, select Connectors from the lefthand navigation

  2. In the Filter by: section, click the Deployment dropdown and select Custom

  3. Click the Counter Source connector tile

  4. Click Generate API key & download in order to create Kafka credentials that the connector will use, and then click Continue on the bottom right

  5. In the Configuration step, enter kafka.topic as the key and counter-topic as the value. This is the only required property. There is also an interval.ms property to specify the number of milliseconds to wait between events, but we can leave that out and accept the default of 1000 (1 second).

  6. Click Continue to proceed to the Networking section, and then Continue again to proceed to the Sizing section since there are no connection endpoints to allow list

  7. Click Continue to run the connector with a single task

  8. Click Continue once more to provision the connector. You’ll see a tile for the connector on the Connectors page showing that the connector is Provisioning.

Monitor the custom connector on Confluent Cloud

7

Once the connector has been provisioned, click on its tile on the Connectors page.

In the Overview tab, you’ll see that the connector is generating messages at a throughput of 1 message per second.

Custom Connector Overview

Now click the Logs tab, where you can search and filter log messages that the connector is generating in order to monitor or debug.

Custom Connector Logs

Now, click on Topics in the lefthand navigation and select the topic that the source connector is writing to (counter-topic). In the Messages viewer, you’ll see incrementing numbers being emitted.

Delete the custom connector

8

To stop the connector:

  1. Select Connectors in the lefthand navigation, and then click the tile corresponding to the custom connector that’s running

  2. Click the Settings tab

  3. Click the Delete connector link on the bottom left

  4. Enter the connector name and then click Confirm

To also uninstall the custom connector plugin:

  1. Find the Counter Source connector on the Connectors page

  2. Hover over the connector tile, and then click Edit plugin

  3. Click the Delete plugin link on the bottom left

  4. Enter the plugin name and then click Confirm