Senior Developer Advocate (Presenter)
Kafka Connect is built around a pluggable architecture of several components, which together provide very flexible integration pipelines. To get the most out of Kafka Connect it’s important to understand these components and their roles:
The key component of any Kafka Connect pipeline is a connector instance which is a logical job that defines where data should be copied to and from. All of the classes that implement or are used by a connector instance are defined in its connector plugin. Written by the community, a vendor, or occasionally written bespoke by the user, the plugin integrates Kafka Connect with a particular technology. These plugins are reusable components that you can download, install, and use without writing code.
For example:
A SOURCE connector plugin knows how to talk to a specific SOURCE data system and generate records that Kafka Connect then writes into Kafka. On the downstream side, the connector instance configuration specifies the topics to be consumed and Kafka Connect reads those topics and sends them to the SINK connector that knows how to send those records to a specific SINK data system.
So the connectors know how to work with the records and talk to the external data system, but Kafka Connect workers act as the conductor and take care of the rest. We will define what a worker is shortly.
To specify a connector you include its name in your configuration—each connector's documentation will give you the particular classname string to use. As you may expect, connectors have different configuration properties specific to the technology with which they’re integrating. A cloud connector will need to know the region, the credentials, and the endpoint to use. A database connector will need to know the names of the tables, the database hostname, and so on.
Here’s an example that creates an Elasticsearch sink connector instance with a call to Kafka Connect’s REST API:
curl -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/sink-elastic-01/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics" : "orders",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "false",
"schema.ignore" : "true"
}'
You can also use ksqlDB to manage connectors.
Here is the syntax for adding the previous Elasticsearch sink connector instance:
CREATE SINK CONNECTOR sink-elastic-01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'orders',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'false',
'schema.ignore' = 'true'
);
In addition to using the Kafka Connect REST API directly, you can add connector instances using the Confluent Cloud console.
It’s important to understand that the connector plugins themselves don't read from or write to (consume/produce) Kafka itself. The plugins just provide the interface between Kafka and the external technology. This is a deliberate design.
Kafka Connect and its underlying components take care of writing data received from source connectors to Kafka topics as well as reading data from Kafka topics and passing it to sink connectors.
Now, this is all hidden from the user—when you add a new connector instance, that’s all you need to configure and Kafka Connect does the rest to get the data flowing. Converters are the next piece of the puzzle and it is important to understand them to help you avoid common pitfalls with Kafka Connect. Technically, transforms sit between connectors and converters, but we’ll visit those later.
Converters are responsible for the serialization and deserialization of data flowing between Kafka Connect and Kafka itself. You’ll sometimes see similar components referred to as SerDes (“SerializerDeserializer”) in Kafka Streams, or just plain old serializers and deserializers in the Kafka Client libraries.
There are a ton of different converters available, but some common ones include:
While Kafka doesn’t care about how you serialize your data (as far as it’s concerned, it’s just a series of bytes), you should care about how you serialize your data! In the same way that you would take a carefully considered approach to how you design your services and model your data, you should also be deliberate in your serialization approach.
As well as managing the straightforward matter of serializing data flowing into Kafka and deserializing it on its way out, converters have a crucial role to play in the persistence of schemas. Almost all data that we deal with has a schema; it’s up to us whether we choose to acknowledge that in our designs or not. You can consider schemas as the API between applications and components of a pipeline. Schemas are the contract between one component in the pipeline and another, describing the shape and form of the data.
When you ingest data from a source such as a database, as well as the rows of data, you have the metadata that describes the fields—the data types, their names, etc. Having this schema metadata is valuable, and you will want to retain it in an efficient manner. A great way to do this is by using a serialization method such as Avro, Protobuf, or JSON Schema. All three of these will serialize the data onto a Kafka topic and then store the schema separately in the Confluent Schema Registry. By storing the schema for data, you can easily utilize it in your consuming applications and pipelines. You can also use it to enforce data hygiene in the pipeline by ensuring that only data that is compatible with the schema is stored on a given topic.
You can opt to use serialization formats that don’t store schemas like JSON, string, and byte array, and in some cases, these are valid. If you use these, just make sure that you are doing so for deliberate reasons and have considered how else you will handle schema information.
Converters are specified separately for the value of a message, and its key:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schema.registry.url=http://localhost:8081
Note that these converters are set as a global default per Connect worker, but they can be overridden per connector instance.
The third and final key component in Kafka Connect is the transform piece. Unlike connectors and converters, these are entirely optional. You can use them to modify data from a source connector before it is written to Kafka, and modify data read from Kafka before it’s written to the sink. Transforms operate over individual messages as they move, so they’re known as Single Message Transforms or SMTs.
Common uses for SMTs include:
For more complex transformations, including aggregations and joins to other topics or lookups to other systems, a full stream processing layer in ksqlDB or Kafka Streams is recommended.
Connectors, transforms, and converters are all specified as part of the Kafka Connect API, and you can consult the Javadoc to write your own.
Apache Kafka and Confluent have several converters and transforms built in already, but you can install more if you need them. You will find these along with hundreds of connectors in Confluent Hub.
Now that we know the theory, let's take the previous hands-on exercise a step further and transform the input records before writing them to a Kafka topic. In the exercise that follows we'll do exactly that and see how easy it is to do so using Confluent Cloud.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.