Staff Software Practice Lead
One of the things that sets Confluent Cloud apart from Open Source Flink is how it handles the createTable statement. In the open source, this statement is a way to point Flink at your existing Kafka topics and schemas. Despite the name, it's not really creating any new resources. However, in Confluent Cloud calling a createTable statement will construct the underlying resources if necessary. This includes the schemas and Kafka topics.
This makes it quite powerful because it allows us to use the Table API in Java to construct the data stream and the underlying resources needed for that stream. It means we can benefit from the power and flexibility of the Java language at all stages of the pipeline rather than just in the middle.
In this video, we'll show how to use the createTable statement to construct a new Kafka topic and its corresponding schemas. You'll see how the statement is translated into SQL code to be executed by Confluent Cloud and how that eventually creates Protobuf, Avro, and JSON schemas when required.
A basic schema created with the Table API.
Schema schema = Schema.newBuilder()
.column("vin", DataTypes.STRING().notNull())
.column("make", DataTypes.STRING().notNull())
.column("model", DataTypes.STRING().notNull())
.column("year", DataTypes.INT().notNull())
.column("color", DataTypes.STRING().notNull())
.build();
A table descriptor that uses the protocol buffers format, sets the Kafka retention time, the scan startup mode, and partitions the records into six buckets.
TableDescriptor descriptor = TableDescriptor
.forConnector("confluent")
.schema(schema)
.option("key.format", "proto-registry")
.option("value.format", "proto-registry")
.option("kafka.retention.time", "1h")
.option("scan.startup.mode", "latest-offset")
.distributedBy(6, "make")
.build();
Once the descriptor has been created, the table can be created using the createTable method.
env.createTable("cars", descriptor);
An example of using the executeSql method to manually execute a block of sql.
env.executeSql("""
CREATE TABLE `cars` (
`vin` VARCHAR(2147483647) NOT NULL,
`make` VARCHAR(2147483647) NOT NULL,
`model` VARCHAR(2147483647) NOT NULL,
`year` INT NOT NULL,
`color` VARCHAR(2147483647) NOT NULL
) DISTRIBUTED BY (`vin`) INTO 6 BUCKETS WITH (
'connector' = 'confluent',
'kafka.retention.time' = '7 days',
'scan.startup.mode' = 'latest-offset',
'key.format' = 'proto-registry',
'value.format' = 'proto-registry'
)""");
I don't know about you, but I can never remember the syntax to create schemas for Kafka topics.
There are Avro, Protobuf, and Json schemas, and I get lost in the difference between them.
Thankfully, the Flink Table API provides a single unified format with all of the benefits of Java autocompletion.
This means you can build your schema in Java, and have it automatically translated to the correct format.
And if you are working in Confluent Cloud, there's some powerful magic I will show you toward the end of the video.
In the meantime, let me show you a few examples.
Imagine trying to build a table that works as an inventory for cars.
It might have columns like the Vehicle Identification Number or vin, the make, the model, the year, and the color.
In this case, the datatypes are relatively simple.
Everything is going to be a String, except for the year, which will be an Integer.
But how do we translate these details into an appropriate Schema for use in Kafka?
Using protocol buffers gives a pretty straightforward solution, as we can see here.
In part, this simplicity is because protocol buffers no longer support required fields.
If required fields are important, we could use a JSON schema instead.
However, they're more verbose and we haven't even added required fields yet.
Meanwhile, Avro schemas are similar, if slightly less wordy.
That's not to say that these formats are bad, they are very good at what they do.
But each has advantages and disadvantages which may lead us to use more than one.
Suddenly, we have to remember the syntax for each as we jump between them.
Thankfully, the Flink Table API provides a unified Java syntax for all three.
Using the Table API, we start by creating a Schema object.
This uses the builder pattern as shown here.
From there, we define the appropriate columns and their corresponding data types.
The advantage is that our development environment can help us along the way.
If we can't remember the exact syntax, the autocomplete features can fill in details for us.
For example, we can inspect the DataTypes object to see all of the types available beyond string and int.
Now, this schema is a little naive because it doesn't indicate which fields are required.
We can make it more robust by adding the notNull modifier on the required fields, in this case, all of them.
This still leaves us with a relatively simple schema, but they can get a lot more complicated.
The question is, how do we translate this into Avro, JSON, and Protobuf schemas?
To do that, we need a TableDescriptor.
Once again, we use the builder pattern.
Assuming we are connecting to Confluent Cloud, we will use the confluent connector.
We can assign the schema that we previously created to the descriptor as shown.
Then, we just need to tell the descriptor which registry format to use.
Here, I am telling it to use the proto-registry or protocol buffers format for both the key and value.
We can also specify json-registry or avro-registry instead.
And we can apply separate formats to the key and value.
There are other options that can be added to the descriptor.
Here, I am setting the kafka.retention.time to 7 days.
I am also telling it to read the topic from the latest-offset.
Check out the documentation for a full list of options.
The distributedBy modifier can be used to specify how the data should be partitioned.
I've specified that the data should be divided into 6 buckets, or Kafka partitions, using make as a distribution key.
Just be careful here.
How the data is distributed can be critical to creating a balanced workflow.
For example, choosing make might not be a good approach since it could lead to hotspots on popular brands.
The year isn't really any better since it could create hotspots for newer vehicles.
A poor choice of key can also lead to idle partitions which can halt the flow of data completely, if we aren't careful.
We'll discuss idle partitions more in a future video.
So what would be a good key?
Using a hash of the vin would give a random distribution, which is good for balance.
But, how you choose to distribute the data can depend on factors such as how it will be consumed.
When in doubt, aim for an even distribution if possible, however, sometimes, we want to distribute similar records to the same bucket.
It's important to pay attention to how the data is organized and partition it according to our needs.
Once we have defined our descriptor, the final step is to use it to create our table in the Table API.
Here, I'm creating a table named cars using the descriptor that I defined previously.
And this is where the magic happens.
Everything I have shown so far is applicable in open-source Flink.
When we execute the createTable statement, it registers the table in the catalog and points it at a pre-existing
topic and schema.
But, both the topic and schema still need to be created manually.
Which means we aren't really saving any effort.
But, in Confluent Cloud, executing a createTable statement will create the underlying resources required, including
the topic and schema.
This means that we can define our table entirely in Java, and rely on Confluent Cloud to create the corresponding objects.
So let's look at, roughly, what happens when we execute a createTable statement.
The first step is that the statement is converted to its equivalent SQL.
It might look something like this.
Then, the statement is executed by Confluent Cloud.
As a side note, we could actually execute the SQL directly from the table API using the executeSql method
as shown here.
However, in doing so, we'd lose the benefits of autocompletion that we get by using the Java API.
But this is a useful technique for tools that aren't supported by the Table API, such as MATCH_RECOGNIZE.
Now, when the createTable statement is executed,
it will create three things.
It creates the key schema,
the value schema,
and the topic.
The options we configured in the TableDescriptor will be used to determine how these resources are created.
For example, because we specified proto-registry for the value.format it will automatically translate our schema
into a corresponding protocol buffers schema and record it in the registry.
This saves us the trouble of having to create these resources ourselves and allows us to build everything we need
using a single language.
So when working with Confluent Cloud, the magic is that the createTable statement isn't just there to tell Flink where to find
the resources.
It also creates resources if necessary, which is a different behavior from the open-source version.
I hope you found this video informative.
We've got more coming, so stay tuned.
If you have any questions, drop us a comment and we'll make sure to respond.
Thanks for watching.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.