Staff Software Practice Lead
The Apache Flink® Table API can connect to Confluent Cloud with just a few lines of code. It requires minimal boilerplate and a small set of configuration parameters.
This is possible because the Confluent Plugin does significant work behind the scenes to make things easier.
This includes:
However, to understand how much work the plugin does behind the scenes, we must recognize how Confluent Cloud concepts get translated into Apache Flink concepts.
Learn more about how Confluent Cloud terminology maps to Apache Flink in this short video.
This snippet of code will connect to Confluent Cloud using the cloud.properties file, then execute a very simple query against the ecommerce.marketplace.orders table.
EnvironmentSettings settings =
ConfluentSettings.fromResource("/cloud.properties");
TableEnvironment env = TableEnvironment.create(settings);
TableResult result = env
.from("`ecommerce`.`marketplace`.`orders`")
.select($("*"))
.execute();
Do you know what drives me crazy?
Having to write so much boilerplate every time I decide to use a new library.
Well, if you've got about five minutes, I'll show you how to wire up the Flink Table API to work with your existing Confluent Cloud environment.
And I'll do it with just a few lines of code.
Along the way, I'll explain the relationships between Flink components and their counterparts in Confluent Cloud.
But I said five minutes, so start your timer now.
To start, we need a TableEnvironment that will be the foundation for our Table API code.
To create one, we use EnvironmentSettings which come from the ConfluentSettings.fromResource method.
It takes a properties file that contains the necessary connection parameters.
The file will contain a variety of configurations like those you see here.
Depending on your needs, and the version of the Confluent Plugin you use, the exact settings may vary.
If you want more detail, check out the exercises for this course, or look for links in the video description.
Once we have our settings, we can create a TableEnvironment.
At this point, the Confluent Plugin will do some magic to make our lives easier.
Let's talk about what that magic is.
The Flink Table API works with a variety of objects, including:
Catalogs
Databases
and Tables
These are organized in a hierarchy, with Catalogs at the top, followed by Databases, and then Tables.
Each catalog contains one or more databases and each database contains one or more tables.
To identify a specific table you need to know which catalog and database it lives in.
Then you can construct a unique identifier for the table as shown.
So if we had an identifier that looked like this, we would go to the ecommerce catalog, locate the marketplace database, and look for the orders table.
Usually, you have to define all of these structures yourself.
You have to create the catalogs, the databases, and even the tables.
Granted, you could skip creating the catalogs and databases, but then you lose the organization and discoverability they provide, especially in a larger system.
Thankfully, Confluent Cloud automates a lot of this.
It groups related resources under something called an environment.
This includes a schema registry that contains metadata about your Kafka topics.
Essentially, it acts as a catalog for your resources.
As such, Confluent Cloud maps the schema registry to a corresponding Flink catalog.
However, the Environment name is used to identify the Catalog, so we often refer to the Environment rather than the Schema Registry.
In this example, the ecommerce Catalog would map to the ecommerce environment in Confluent Cloud.
To set the default catalog in the Table API we can execute a useCatalog statement.
Because Confluent Cloud has already defined the catalog for us, we don't need any additional code.
Now, let's drop down a level.
In Confluent Cloud, an environment can contain one or more Kafka Clusters.
Each of those clusters is automatically mapped to a Database in the Table API.
Going back to the example, the marketplace cluster lives inside the ecommerce environment.
And we would set the default database with the useDatabase command.
And again, there is no need to define a database because Confluent Cloud has done that for us.
Dropping down one more layer, each Kafka Cluster can contain one or more topics.
The topic contains all of the events or records we want to query.
Logically, these topics map to a table in Flink.
So again, looking at the example, the orders topic is part of the marketplace cluster, which lives in the ecommerce environment.
However, it's not quite that simple.
A table in Flink defines where the data lives and what shape it will take.
This requires a schema, but Kafka topics don't have schemas built in.
To obtain the schema we must look for the corresponding entries in the schema registry.
This means that in Flink, a Table maps to the Kafka topic, but also the key schema, and the value schema.
And once again, this is all automatic.
With all those pieces in place, we can reference a table using the from method in the Table API.
However, there are two ways to do that.
We can have it look in the default catalog and database as shown here.
Or we can provide an absolute path to the table instead.
As long as the topic and schemas exist, we can reference the matching table without having to define anything ahead of time.
There's one more resource I want to talk about.
Here we see a very simple Table API query.
It searches the orders and selects all fields.
When we execute this, it produces a SQL statement that will be sent to Confluent Cloud.
That statement might look like this.
Confluent Cloud will execute the statement and return all records in the topic.
However, that requires compute resources.
Those resources have to be defined in the form of a Compute Pool.
Each environment can contain one or more compute pools.
And each statement will be assigned to use the resources from one of those pools.
This assignment happens using one of the configuration settings we provided in the cloud.properties file.
So, here we have our completed code.
I said five minutes at the beginning, and I think we should be pretty close.
In the end, the amount of code required to write a simple Table API query is quite minimal.
We can get by with very little in the way of boilerplate.
This is all thanks to the magic that Confluent Cloud is doing behind the scenes to automate the creation of many of our resources.
As we will see in later videos, this goes both ways.
Resources we create through the Table API can automatically trigger the creation of corresponding elements in Confluent Cloud.
I hope this video has peaked your interest in trying out the Flink Table API.
We're already working on future videos that will expand on these topics.
Feel free to drop us a comment if you feel that anything needs a deeper dive.
Thanks for watching.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.