How do I write, package, and run a custom connector on Confluent Cloud?
To get started, make a new directory anywhere you’d like for this project:
mkdir -p custom-connector/src/main/java/io/confluent/developer/connect/ && cd custom-connector
Important: In order to run a custom connector, your Kafka cluster must reside in a supported cloud provider and region.
This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.
After you log in to Confluent Cloud, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
We’ll start by developing a simple source connector that emits numeric data to a Kafka topic.
We’ll build the connector with Maven, so ensure that you have it installed and on
your path. Before proceeding, verify that mvn -version
succeeds when you run it on the command line.
First, create the following pom.xml
file in the custom-connector
directory:
<?xml version="1.0" encoding="UTF-8"?>
<!--~
~ Copyright 2023 Confluent Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.confluent.developer</groupId>
<artifactId>kafka-connect-counter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<name>kafka-connect-counter</name>
<description>
A dummy Kafka Connect source connector that emits events containing incrementing numbers
</description>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.4.0</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.confluent</groupId>
<version>0.11.1</version>
<artifactId>kafka-connect-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>kafka-connect</goal>
</goals>
<configuration>
<title>Counter Kafka Connector</title>
<description>Demo connector that emits events with incrementing long values</description>
<ownerUsername>confluentinc</ownerUsername>
<ownerType>organization</ownerType>
<ownerName>Confluent, Inc.</ownerName>
<ownerUrl>https://confluent.io/</ownerUrl>
<componentTypes>
<componentType>source</componentType>
</componentTypes>
<confluentControlCenterIntegration>true
</confluentControlCenterIntegration>
<singleMessageTransforms>false
</singleMessageTransforms>
<supportedEncodings>UTF-8</supportedEncodings>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Most of the POM is boilerplate, but notice two important sections that any custom connector should have:
A dependency on the Kafka Connect API, connect-api
The kafka-connect-maven-plugin
plugin, which does the heavy lifting to build the connector zip following this spec. This lets us simply run mvn package
later in this tutorial in order to build the plugin zip that we will upload to Confluent Cloud.
Now we’ll create the three classes that comprise our connector: a configuration class, the connector class that most notably builds connector task configurations, and the source task class that generates records.
Go ahead and create the following file at src/main/java/io/confluent/developer/connect/CounterConnectorConfig.java
. This class defines the two configuration properties for our connector: the topic to write to (kafka.topic
), and the number of milliseconds to wait between emitting events (interval.ms
).
package io.confluent.developer.connect;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Map;
public class CounterConnectorConfig extends AbstractConfig {
public static final String KAFKA_TOPIC_CONF = "kafka.topic";
private static final String KAFKA_TOPIC_DOC = "Topic to write to";
public static final String INTERVAL_CONF = "interval.ms";
private static final String INTERVAL_DOC = "Interval between messages (ms)";
public CounterConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
public CounterConnectorConfig(Map<String, String> parsedConfig) {
this(conf(), parsedConfig);
}
public static ConfigDef conf() {
return new ConfigDef()
.define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
.define(INTERVAL_CONF, Type.LONG, 1_000L, Importance.HIGH, INTERVAL_DOC);
}
public String getKafkaTopic() {
return this.getString(KAFKA_TOPIC_CONF);
}
public Long getInterval() {
return this.getLong(INTERVAL_CONF);
}
}
Next, create the following file at src/main/java/io/confluent/developer/connect/CounterConnector.java
. You can think of this class as the glue between the connector configuration and the task implementation. We’ll provide the fully qualified name of this class (io.confluent.developer.connect.CounterConnector
) later in this tutorial when adding the connector plugin to Confluent Cloud.
package io.confluent.developer.connect;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CounterConnector extends SourceConnector {
private Map<String, String> props;
@Override
public String version() {
return CounterConnector.class.getPackage().getImplementationVersion();
}
@Override
public void start(Map<String, String> props) {
this.props = props;
}
@Override
public void stop() {
}
@Override
public Class<? extends Task> taskClass() {
return CounterSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(this.props);
taskConfig.put(CounterSourceTask.TASK_ID, Integer.toString(i));
taskConfigs.add(taskConfig);
}
return taskConfigs;
}
@Override
public ConfigDef config() {
return CounterConnectorConfig.conf();
}
}
Finally, create the following file at src/main/java/io/confluent/developer/connect/CounterSourceTask.java
. The poll()
method in this class generates our numeric events.
package io.confluent.developer.connect;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CounterSourceTask extends SourceTask {
public static final String TASK_ID = "task.id";
public static final String CURRENT_ITERATION = "current.iteration";
private CounterConnectorConfig config;
private String topic;
private long interval;
private long count = 0L;
private int taskId;
private Map sourcePartition;
@Override
public String version() {
return CounterSourceTask.class.getPackage().getImplementationVersion();
}
@Override
public void start(Map<String, String> props) {
config = new CounterConnectorConfig(props);
topic = config.getKafkaTopic();
interval = config.getInterval();
taskId = Integer.parseInt(props.get(TASK_ID));
sourcePartition = Collections.singletonMap(TASK_ID, taskId);
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition);
if (offset != null) {
// the offset contains our next state, so restore it as is
count = ((Long) offset.get(CURRENT_ITERATION));
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
if (interval > 0) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
Thread.interrupted();
return null;
}
}
Map sourceOffset = Collections.singletonMap(CURRENT_ITERATION, count + 1);
final List<SourceRecord> records = Collections.singletonList(new SourceRecord(
sourcePartition,
sourceOffset,
topic,
null,
Schema.INT64_SCHEMA,
count
));
count++;
return records;
}
@Override
public void stop() {
}
}
In your terminal, run the following command. You can ignore the warnings generated by the kafka-connect-maven-plugin
.
mvn package
This generates the connector zip in the target/components/packages/
directory.
Now we’ll add the connector plugin to Confluent Cloud. We’re not actually running the connector in this step; we’re just uploading the plugin so that we can run the connector in the next section.
Go ahead and create a Kafka cluster in the Confluent Cloud Console if you didn’t already do so above. Your Kafka cluster must reside in a supported cloud provider and region in order for you to be able to upload the plugin to Confluent Cloud.
On the cluster homepage, select Connectors
from the lefthand navigation, and then click the Add plugin
button on the top right.
Fill in the form as follows:
Name: Counter Source
Description: Source connector that emits incrementing numbers
Connector class: the class that extends Kafka Connect’s SourceConnector
class, i.e., io.confluent.developer.connect.CounterConnector
Connector type: Select Source
Click Select connector archive
and choose the zip file built in the previous step
Sensitive properties: leave this section alone since the connector doesn’t make use of any sensitive properties like credentials to connect to an external system
Review the custom connector disclaimer and check the box agreeing that you are responsible for the code uploaded to Confluent Cloud
Click Submit
!
Let’s now run the connector.
On the cluster homepage, select Connectors
from the lefthand navigation
In the Filter by:
section, click the Deployment
dropdown and select Custom
Click the Counter Source
connector tile
Click Generate API key & download
in order to create Kafka credentials that the connector will use, and then click Continue
on the bottom right
In the Configuration
step, enter kafka.topic
as the key and counter-topic
as the value. This is the only required property. There is also an interval.ms
property to specify the number of milliseconds to wait between events, but we can leave that out and accept the default of 1000 (1 second).
Click Continue
to proceed to the Networking
section, and then Continue
again to proceed to the Sizing
section since there are no connection endpoints to allow list
Click Continue
to run the connector with a single task
Click Continue
once more to provision the connector. You’ll see a tile for the connector on the Connectors page showing that the connector is Provisioning
.
Once the connector has been provisioned, click on its tile on the Connectors
page.
In the Overview
tab, you’ll see that the connector is generating messages at a throughput of 1 message per second.
Now click the Logs
tab, where you can search and filter log messages that the connector is generating in order to monitor or debug.
Now, click on Topics
in the lefthand navigation and select the topic that the source connector is writing to (counter-topic
). In the Messages
viewer, you’ll see incrementing
numbers being emitted.
To stop the connector:
Select Connectors
in the lefthand navigation, and then click the tile corresponding to the custom connector that’s running
Click the Settings
tab
Click the Delete connector
link on the bottom left
Enter the connector name and then click Confirm
To also uninstall the custom connector plugin:
Find the Counter Source
connector on the Connectors
page
Hover over the connector tile, and then click Edit plugin
Click the Delete plugin
link on the bottom left
Enter the plugin name and then click Confirm