Get Started Free
‹ Back to courses
course: Apache Flink® Table API: Processing Data Streams in Java

Using Apache Flink® in Confluent Cloud

6 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Apache Flink® In Confluent Cloud

The Apache Flink® Table API can connect to Confluent Cloud with just a few lines of code. It requires minimal boilerplate and a small set of configuration parameters.

This is possible because the Confluent Plugin does significant work behind the scenes to make things easier.

This includes:

  • Setting up catalogs.
  • Creating databases.
  • Defining tables.
  • And more.

However, to understand how much work the plugin does behind the scenes, we must recognize how Confluent Cloud concepts get translated into Apache Flink concepts.

Learn more about how Confluent Cloud terminology maps to Apache Flink in this short video.

Topics:

  • How to create a Table Environment.
  • What is the Flink resource hierarcy?
  • How do Confluent environments map to Flink?
  • How do Kafka clusters map to Flink?
  • How do Kafka topics map to Flink?
  • Why is a compute pool necessary?

Resources

Code

This snippet of code will connect to Confluent Cloud using the cloud.properties file, then execute a very simple query against the ecommerce.marketplace.orders table.

EnvironmentSettings settings = 
  ConfluentSettings.fromResource("/cloud.properties");
TableEnvironment env = TableEnvironment.create(settings);

TableResult result = env
  .from("`ecommerce`.`marketplace`.`orders`")
  .select($("*"))
  .execute();

Use the promo codes FLINKTABLEAPIJAVA & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Using Apache Flink® in Confluent Cloud

Do you know what drives me crazy?

Having to write so much boilerplate every time I decide to use a new library.

Well, if you've got about five minutes, I'll show you how to wire up the Flink Table API to work with your existing Confluent Cloud environment.

And I'll do it with just a few lines of code.

Along the way, I'll explain the relationships between Flink components and their counterparts in Confluent Cloud.

But I said five minutes, so start your timer now.

To start, we need a TableEnvironment that will be the foundation for our Table API code.

To create one, we use EnvironmentSettings which come from the ConfluentSettings.fromResource method.

It takes a properties file that contains the necessary connection parameters.

The file will contain a variety of configurations like those you see here.

Depending on your needs, and the version of the Confluent Plugin you use, the exact settings may vary.

If you want more detail, check out the exercises for this course, or look for links in the video description.

Once we have our settings, we can create a TableEnvironment.

At this point, the Confluent Plugin will do some magic to make our lives easier.

Let's talk about what that magic is.

The Flink Table API works with a variety of objects, including:

Catalogs

Databases

and Tables

These are organized in a hierarchy, with Catalogs at the top, followed by Databases, and then Tables.

Each catalog contains one or more databases and each database contains one or more tables.

To identify a specific table you need to know which catalog and database it lives in.

Then you can construct a unique identifier for the table as shown.

So if we had an identifier that looked like this, we would go to the ecommerce catalog, locate the marketplace database, and look for the orders table.

Usually, you have to define all of these structures yourself.

You have to create the catalogs, the databases, and even the tables.

Granted, you could skip creating the catalogs and databases, but then you lose the organization and discoverability they provide, especially in a larger system.

Thankfully, Confluent Cloud automates a lot of this.

It groups related resources under something called an environment.

This includes a schema registry that contains metadata about your Kafka topics.

Essentially, it acts as a catalog for your resources.

As such, Confluent Cloud maps the schema registry to a corresponding Flink catalog.

However, the Environment name is used to identify the Catalog, so we often refer to the Environment rather than the Schema Registry.

In this example, the ecommerce Catalog would map to the ecommerce environment in Confluent Cloud.

To set the default catalog in the Table API we can execute a useCatalog statement.

Because Confluent Cloud has already defined the catalog for us, we don't need any additional code.

Now, let's drop down a level.

In Confluent Cloud, an environment can contain one or more Kafka Clusters.

Each of those clusters is automatically mapped to a Database in the Table API.

Going back to the example, the marketplace cluster lives inside the ecommerce environment.

And we would set the default database with the useDatabase command.

And again, there is no need to define a database because Confluent Cloud has done that for us.

Dropping down one more layer, each Kafka Cluster can contain one or more topics.

The topic contains all of the events or records we want to query.

Logically, these topics map to a table in Flink.

So again, looking at the example, the orders topic is part of the marketplace cluster, which lives in the ecommerce environment.

However, it's not quite that simple.

A table in Flink defines where the data lives and what shape it will take.

This requires a schema, but Kafka topics don't have schemas built in.

To obtain the schema we must look for the corresponding entries in the schema registry.

This means that in Flink, a Table maps to the Kafka topic, but also the key schema, and the value schema.

And once again, this is all automatic.

With all those pieces in place, we can reference a table using the from method in the Table API.

However, there are two ways to do that.

We can have it look in the default catalog and database as shown here.

Or we can provide an absolute path to the table instead.

As long as the topic and schemas exist, we can reference the matching table without having to define anything ahead of time.

There's one more resource I want to talk about.

Here we see a very simple Table API query.

It searches the orders and selects all fields.

When we execute this, it produces a SQL statement that will be sent to Confluent Cloud.

That statement might look like this.

Confluent Cloud will execute the statement and return all records in the topic.

However, that requires compute resources.

Those resources have to be defined in the form of a Compute Pool.

Each environment can contain one or more compute pools.

And each statement will be assigned to use the resources from one of those pools.

This assignment happens using one of the configuration settings we provided in the cloud.properties file.

So, here we have our completed code.

I said five minutes at the beginning, and I think we should be pretty close.

In the end, the amount of code required to write a simple Table API query is quite minimal.

We can get by with very little in the way of boilerplate.

This is all thanks to the magic that Confluent Cloud is doing behind the scenes to automate the creation of many of our resources.

As we will see in later videos, this goes both ways.

Resources we create through the Table API can automatically trigger the creation of corresponding elements in Confluent Cloud.

I hope this video has peaked your interest in trying out the Flink Table API.

We're already working on future videos that will expand on these topics.

Feel free to drop us a comment if you feel that anything needs a deeper dive.

Thanks for watching.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.