Senior Developer Advocate (Presenter)
In this hands on demonstration, we will be doing the following:
This exercise will be nearly the same as the Hands On: Getting Started with Kafka Connect exercise other than the addition of the two SMTs.
Let’s get started.
kc-101
clusterFirst things first, we’re using the Datagen connector in this exercise so let’s find and select it using the filter.
We will once again generate sample data using the Orders record schema, but since we want to transform individual messages using a Single Message Transform, we will need to use the advanced configuration options.
Let’s add our first SMT.
We could accept the default label for this transform, but it makes the configuration easier to read if we give it a name that corresponds to the SMT that will be used so let’s do this. We’re going to create an SMT to cast fields from each message, so we’ll call it “castValues”.
We also need to identify which SMT we want to use.
For this SMT, we need to enter a list of fields we want to transform and the type we want to cast them to. We do this by specifying the field name and the desired type separated by the colon. We can use any number of these using a comma-delimited list.
The configuration of our first SMT is complete.
Let’s now add a second SMT. This time, we’re creating an SMT to convert the timestamps of each message. Again, we’ll give it an appropriate name and fill out the required configuration parameters.
Click Add a single message transform.
Set the value of Transform name equal to convertTimestamp.
In the Transform type list, select org.apache.kafka.connect.transforms.TimestampConverter$Value.
Set the value of target.type equal to string.
This tells the SMT the resulting value should be type string.
Set the value of field equal to ordertime.
Set the value of format equal to yyyy-MM-dd.
This is the format the ordertime field will be changed to by the SMT.
That completes the SMT configuration.
Let’s continue to the next step.
For this Datagen connector instance, we will once again write the output records to the orders topic.
In order for our connector to communicate with our cluster, we need to provide an API key for it. You can use an existing API key and secret, or create one here, as we’re doing.
We will also use the default sizing for this instance of the connector.
Before we launch the connector, let’s examine its JSON configuration and identify the SMT related settings.
Notice the configuration for the two transforms is included in the connector configuration.
You could also create the connector using either the Confluent Cloud Connect API or confluent CLI and this same JSON configuration. Other than having to provide the actual value for the API key and secret, the JSON is ready to use.
Let’s now launch the connector and observe the result of the SMTs.
Now that the connector is running, let’s view messages being written to the topic and compare messages produced without the SMTs to those produced using the SMTs.
Using the Jump to offset option, locate a range of records that includes the last few with the original ordertime format and the first few with the transformed ordertime format.
Note: You will need to click the pause button as soon as the display jumps to the target records to keep them in view
As you can see in the current view, offsets 69 and 70 have the original ordertime format and the messages written after offset 70 have the updated ordertime format. Notice also the change in data type for the orderid and orderunits fields.
Before we end the exercise, let’s delete the connector so we don’t unnecessarily deplete any Confluent Cloud promotional credits.
Let’s also delete the orders topic.
We will not delete the kc-101 cluster at this time since we will be using it in other exercises that are part of this course.
This concludes this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.