Course: Kafka Connect 101

Connectors, Configuration, Converters, and Transforms

13 min
Tim BerglundSr. Director, Developer Advocacy (Course Presenter)
Robin MoffattStaff Developer Advocate (Course Author)

Inside Kafka Connect

Kafka Connect is built around a pluggable architecture of several components, which together provide very flexible integration pipelines. To get the most out of Kafka Connect it’s important to understand these components and their roles:

  • Connectors are responsible for the interaction between Kafka Connect and the external technology being integrated with
  • Converters handle the serialization and deserialization of data
  • Transformations can optionally apply one or more transformations to the data passing through the pipeline

Single Message Transforms: Configuring Kafka Connect 1

single-message-transforms-kafka-connect-configuring 2

Connectors

The key component of any Kafka Connect pipeline is the connector. Written by the community, a vendor, or occasionally written bespoke by the user, it integrates Kafka Connect with a particular technology. For example:

  • The Debezium MySQL source connector uses the MySQL bin log to read events from the database and stream these to Kafka Connect
  • The Elasticsearch sink connector takes data from Kafka Connect, and using the Elasticsearch APIs, writes the data to Elasticsearch
  • The S3 connector from Confluent can act as both a source and sink connector, writing data to S3 or reading it back in

single-message-transforms-kafka-connect-configuring 3

single-message-transforms-kafka-connect-configuring 4

As you may expect, connectors have different configuration properties specific to the technology with which they’re integrating. A Cloud connector will need to know the region, the credentials, and the endpoint to use. A database connector will need to know the names of the tables, the database hostname, and so on.

Here’s an example of creating a connector instance in Kafka Connect using the Elasticsearch sink connector:

curl -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-elastic-01/config \
    -d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics"         : "orders",
    "connection.url" : "http://elasticsearch:9200",
    "type.name"      : "_doc",
    "key.ignore"     : "false",
    "schema.ignore"  : "true"
}'

The above is a call to Kafka Connect’s REST API. You can also use ksqlDB to manage connectors. The syntax for the above connector looks like this:

CREATE SINK CONNECTOR sink-elastic-01 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'topics'          = 'orders',
  'connection.url'  = 'http://elasticsearch:9200',
  'type.name'       = '_doc',
  'key.ignore'      = 'false',
  'schema.ignore'   = 'true'
);

Connectors themselves don’t actually write or read from Kafka directly, but instead, interface with Kafka Connect by passing (source) or receiving (sink) a representation of the data, including its metadata and schema (if available) to the next component in Kafka Connect. Now, this is all hidden from the user—when you create the Elasticsearch connector shown above, that’s all you need to configure and Kafka Connect does the rest to get the data flowing. But understanding the next piece in the puzzle is important to help you avoid some of the common pitfalls with Kafka Connect, and that is converters. Technically, transforms sit between connectors and converters, but we’ll visit those later.

In addition to using the Kafka Connect REST API directly, you can create connectors using the web interface in Confluent:

Add Elastic Service Sink Connector

Converters

Converters are responsible for the serialization and deserialization of data flowing between Kafka Connect and Kafka itself. You’ll sometimes see similar components referred to as SerDes (“SerialiserDeserialiser”) in Kafka Streams, or just plain old serializers and deserializers in the Kafka Client libraries. Common converters include:

  • JSON
  • Avro
  • Protobuf

While Kafka doesn’t care about how you serialize your data (it’s just a series of bytes), you should care about how you serialize your data! In the same way that you would take a carefully considered approach to how you design your services and model your data, you should also be deliberate in your serialization approach.

single-message-transforms-kafka-connect-configuring 6

single-message-transforms-kafka-connect-configuring 7

As well as managing the straightforward matter of serializing data flowing into Kafka and deserializing it on its way out, converters have a crucial role to play in the persistence of schemas. Almost all data that we deal with has a schema; it’s up to us whether we choose to acknowledge that in our designs or not. You can consider schemas as the API between applications and components of a pipeline. Schemas are the contract between one component in the pipeline and another, describing the shape and form of the data.

When you ingest data from a source such as a database, as well as the rows of data, you have the metadata that describes the fields—the data types, their names, etc. Having this schema metadata is valuable, and you will want to retain it in an efficient manner. A great way to do this is by using a serialization method such as Avro, Protobuf, or JSON Schema. All three of these will serialize the data on to a Kafka topic and then store the schema separately in the Confluent Schema Registry. By storing the schema for data, you can easily utilize it in your consuming applications and pipelines. You can also use it to enforce data hygiene in the pipeline by ensuring that only data that is compatible with the schema is stored on a given topic.

You can opt to use serialization formats that don’t store schemas (JSON, String, and byte array), and in some cases, these are valid. If you use these, just make sure that you are doing so for deliberate reasons and have considered how else you will handle schema information.

You can learn more about the importance of schemas here:

Single Message Transforms (SMTs)

The third key component with Kafka Connect is Single Message Transforms (SMT). Unlike connectors and converters, these are entirely optional. You can use them to modify data from a source connector before it is written to Kafka, and modify data read from Kafka before it’s written to the sink.

single-message-transforms-kafka-connect-configuring 8

single-message-transforms-kafka-connect-configuring 9

Common uses for SMTs include:

  • Dropping fields from data at ingest, such as personally identifiable information (PII) if specified by the system requirements
  • Adding metadata information such as lineage to data ingested through Kafka Connect
  • Changing field data types
  • Modifying the topic name to include a timestamp
  • Renaming fields

For more complex transformations, including aggregations and joins to other topics or lookups to other systems, a full stream processing layer in ksqlDB or Kafka Streams is recommended.

Obtaining Plugins and Writing Your Own

Connectors, transforms, and converters are all specified as part of the Kafka Connect API, and you can consult the Javadocs to write your own.

Apache Kafka and Confluent have several converters and transforms built in already, but you can install more if you need them. You will find these along with hundreds of connectors in the Confluent Hub.

kafka-connect-debezium

Use the promo code CONNECT101 to receive $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.