Get Started Free
course: Apache Flink® Table API: Processing Data Streams in Java

Exercise: Connecting the Flink Table API to Confluent Cloud

30 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Connecting the Apache Flink Table API to Confluent Cloud

This course will walk you through a series of exercises that explore different features of the Apache Flink Table API. You will build several Flink functions that target an eCommerce Marketplace.

In this exercise, we will set up Confluent Cloud and establish a connection from a Flink application to the cluster. In future exercises, we'll expand on this application to produce various queries focused on an online marketplace.

Create a Confluent Cloud account and log in.

NOTE: If you already have a Confluent Cloud account, you can skip this step.

  • Go to the Confluent Cloud signup page and create a new account.
  • Watch your inbox for a confirmation email and follow the link to proceed.
  • You will be asked to create a cluster. Feel free to ignore this. We'll create one shortly.

Install the Confluent CLI

Many of the steps below are easier when completed from the command line. While it's not required, you may want to install the Confluent CLI by following the instructions here:

Install the Confluent CLI

If you prefer to use the CLI, refer to the Command Line Reference sections after each set of instructions.

Create an environment.

Add Cloud Environment

We will create a cloud environment specifically for this course. This will ensure isolation and allow for easy cleanup.

WARNING: Do not use a production environment for this course. Use a new environment with new credentials. You don't want to accidentally compromise your production environment.

  • From the Environments menu click + Add cloud environment.
  • Name your environment flink-table-api-java.
  • Use the Essentials Stream Governance package.
  • You will be asked to create a Kafka cluster. We'll cover that in the next section.

Command Line Reference

Create the environment.

confluent environment create flink-table-api-java --governance-package essentials

Set the active environment.

confluent environment use <environment id>

Create a Kafka cluster

Create Cluster

Later in the course, we will need a Kafka cluster to host new Kafka topics. We'll create that now.

  • If you aren't already at the Create Cluster screen, navigate to the flink-table-api-java environment.

  • Click the Create cluster on my own or Add cluster button to create a new Kafka cluster.

  • When asked for the cluster type, click Begin configuration on a Basic cluster.

  • Select a region and availability zone.

    NOTE: Use the same cloud and region throughout the setup. Using different clouds or regions could impact performance and result in additional costs.

  • Name the cluster marketplace and launch it.

Command Line Reference

Create a Kafka Cluster named marketplace (adjust cloud and region settings as required).

confluent kafka cluster create marketplace --cloud gcp --region us-central1 --type basic

NOTE: Use the same cloud and region throughout the setup. Using different clouds or regions could impact performance and result in additional costs.

Create a Kafka API Key

Add Kafka API Key

Some of the tests will make use of the Kafka Admin Client. This will require an API Key.

  • Still inside the marketplace Kafka cluster, select API Keys from the menu.

  • Select Create key.

  • Select My account.

    NOTE: For production, you probably want to use a Service Account to restrict access.

  • Download and save the key somewhere you can access it later.

Command Line Reference

Create a Kafka API Key (adjust cloud and region settings as required).

confluent kafka cluster list
confluent api-key create --resource <kafka cluster id>

NOTE: Save the key and secret for later.

Create a compute pool.

Add a Compute Pool

Flink uses compute resources and requires a compute pool. Let's create one.

  • Inside the flink-table-api-java environment, select Flink -> Compute Pools and click Create compute pool.
  • Select an appropriate cloud and region.
  • Give your pool the name marketplace-compute.

Command Line Reference

Create a compute pool (adjust cloud and region settings as required).

confluent flink compute-pool create marketplace-compute --cloud gcp --region us-central1 --max-cfu 10

Add a Flink API Key

To access Flink in Confluent Cloud, we need an API key.

  • Still in the Flink tab, select API Keys and click Add API key.

  • Use the My Access key type.

    NOTE: Granular Access is recommended for production.

  • Add an appropriate name and description (eg. marketplace-flink-api-key) for the key.

    NOTE: Download and save the key somewhere so you can access it later.

Command Line Reference

Create a Flink API Key (adjust cloud and region settings as required).

confluent api-key create --resource flink --cloud gcp --region us-central1

NOTE: Save the key and secret for later.

Create a Schema Registry API Key

Add a Schema Registry API Key

The tests will also make use of the Schema Registry Client. This will require another API Key.

  • Navigate to the flink-table-api-java environment.
  • Look for a section named Stream Governance API.
  • Click the + Add key button and create a new key just like in the previous examples.

Command Line Reference

Create a Flink API Key (adjust cloud and region settings as required).

confluent schema-registry cluster describe
confluent api-key create --resource <schema registry cluster>

NOTE: Save the key and secret for later.

Download the code.

Now that we have an environment, we need an application to connect to it. If you haven't already done so clone the Github repo:

You will need a suitable Java development environment including:

  • Java 21
  • Maven
  • An IDE such as IntelliJ, Eclipse, or VS Code.

To easily switch between Java versions, you can use SDKMAN.

NOTE: This project already has many of the settings required to work with the table API. For details on how to setup your own project, check out the Confluent Flink Table API Documentation.

Stage the exercise.

If you haven't already read the README for the repository, do that now. When you are done, stage the exercise by executing:

$ cd exercises
$ ./exercise.sh stage 01

Obtain configuration settings.

Import the project (Maven POM file) from the exercises folder into your IDE.

Locate the file src/main/resources/cloud-template.properties and copy it to src/main/resources/cloud.properties.

Populate the values in the cloud.properties file as follows:

  • client.cloud - The name of the cloud provider you chose (eg. aws/gcp/azure).

  • client.region - The region you chose (eg. us-east-1).

  • client.flink-api-key - The key from the Flink API Key created above.

  • client.flink-api-secret - The secret from the Flink API Key created above.

  • client.organization-id (a GUID/UUID)

    • From the menu in the top right hand corner, select Organization Settings and look for the organization Id.

    • Command Line Reference:

      confluent org list
  • client.environment-id (starts with env-)

    • Select the environment and look for the ID field.

    • Command Line Reference:

      confluent environment list
  • client.compute-pool-id (starts with lfcp-)

    • Select the compute pool and look for the ID field.

    • Command Line Reference:

      confluent flink compute-pool list
  • client.principal-id (starts with u-)

    • From the menu in the top right-hand corner, select Accounts & access and look for your user Id.

    • Command Line Reference:

      confluent iam user list
  • client.kafka.bootstrap.servers (Ends with port 9092)

    • Navigate to your marketplace cluster, select Cluster settings, and look for the Bootstrap server.

    • Or, look in the API Key file for the Kafka cluster.

    • Command Line Reference:

      confluent kafka cluster list
      confluent kafka cluster describe <CLUSTER ID>

      Look for the Endpoint, but remove the SASL_SSL://

  • client.kafka.sasl.jaas.config - Replace the <KAFKA KEY> and <KAFKA SECRET> placeholders with the Key and Secret from the Kafka API Key created above.

  • client.registry.url

    • Navigate to the flink-table-api-java environment. In the `Stream Governance API section, look for the Endpoint.

    • Command Line Reference:

      confluent schema-registry cluster describe

      Look for the Endpoint URL.

  • client.registry.key - The key from the Schema Registry API Key created above.

  • client.registry.secret - The secret from the Schema Registry API Key created above.

Build the Application

Confluent Cloud includes a read-only set of Flink tables in a sandbox-link environment. These tables can be used for experimentation and testing. For a simple test of the connection parameters, we can ask the Table API to list those tables.

In the src/main/java/marketplace/Marketplace class, implement the main method as follows:

  • Use the ConfluentSettings class to load the configuration from the cloud.properties file:

    EnvironmentSettings settings = ConfluentSettings.fromResource("/YOUR.PROPERTIES.FILE");
    Hint

    You must prefix your properties file with the / as shown above.

  • Create a new table environment using the settings:

    TableEnvironment env = TableEnvironment.create(settings);
  • Set the catalog to examples and the database to marketplace.

    env.useCatalog(<Catalog Name>);
    env.useDatabase(<Database Name>);
  • Use env.listTables() to produce a list of tables in the database and print the results.

Run the application.

Finally, we'll run the application to verify it works as expected.

  • In a terminal, execute the application by running the commands:

    mvn clean package
    java -jar target/flink-table-api-marketplace-0.1.jar
  • Assuming you have done everything correctly you should see the following tables printed:

    • clicks
    • customers
    • orders
    • products

Finish

This brings us to the end of this exercise.

Use the promo code FLINKTABLEAPIJAVA to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.