Staff Software Practice Lead
This course will walk you through a series of exercises that explore different features of the Apache Flink Table API. You will build several Flink functions that target an eCommerce Marketplace.
In this exercise, we will set up Confluent Cloud and establish a connection from a Flink application to the cluster. In future exercises, we'll expand on this application to produce various queries focused on an online marketplace.
NOTE: If you already have a Confluent Cloud account, you can skip this step.
Many of the steps below are easier when completed from the command line. While it's not required, you may want to install the Confluent CLI by following the instructions here:
If you prefer to use the CLI, refer to the Command Line Reference sections after each set of instructions.
We will create a cloud environment specifically for this course. This will ensure isolation and allow for easy cleanup.
WARNING: Do not use a production environment for this course. Use a new environment with new credentials. You don't want to accidentally compromise your production environment.
Create the environment.
confluent environment create flink-table-api-java --governance-package essentials
Set the active environment.
confluent environment use <environment id>
Later in the course, we will need a Kafka cluster to host new Kafka topics. We'll create that now.
If you aren't already at the Create Cluster screen, navigate to the flink-table-api-java environment.
Click the Create cluster on my own or Add cluster button to create a new Kafka cluster.
When asked for the cluster type, click Begin configuration on a Basic cluster.
Select a region and availability zone.
NOTE: Use the same cloud and region throughout the setup. Using different clouds or regions could impact performance and result in additional costs.
Name the cluster marketplace and launch it.
Create a Kafka Cluster named marketplace (adjust cloud and region settings as required).
confluent kafka cluster create marketplace --cloud gcp --region us-central1 --type basic
NOTE: Use the same cloud and region throughout the setup. Using different clouds or regions could impact performance and result in additional costs.
Some of the tests will make use of the Kafka Admin Client. This will require an API Key.
Still inside the marketplace Kafka cluster, select API Keys from the menu.
Select Create key.
Select My account.
NOTE: For production, you probably want to use a Service Account to restrict access.
Download and save the key somewhere you can access it later.
Create a Kafka API Key (adjust cloud and region settings as required).
confluent kafka cluster list
confluent api-key create --resource <kafka cluster id>
NOTE: Save the key and secret for later.
Flink uses compute resources and requires a compute pool. Let's create one.
Create a compute pool (adjust cloud and region settings as required).
confluent flink compute-pool create marketplace-compute --cloud gcp --region us-central1 --max-cfu 10
To access Flink in Confluent Cloud, we need an API key.
Still in the Flink tab, select API Keys and click Add API key.
Use the My Access key type.
NOTE: Granular Access is recommended for production.
Add an appropriate name and description (eg. marketplace-flink-api-key) for the key.
NOTE: Download and save the key somewhere so you can access it later.
Create a Flink API Key (adjust cloud and region settings as required).
confluent api-key create --resource flink --cloud gcp --region us-central1
NOTE: Save the key and secret for later.
The tests will also make use of the Schema Registry Client. This will require another API Key.
Create a Flink API Key (adjust cloud and region settings as required).
confluent schema-registry cluster describe
confluent api-key create --resource <schema registry cluster>
NOTE: Save the key and secret for later.
Now that we have an environment, we need an application to connect to it. If you haven't already done so clone the Github repo:
You will need a suitable Java development environment including:
To easily switch between Java versions, you can use SDKMAN.
NOTE: This project already has many of the settings required to work with the table API. For details on how to setup your own project, check out the Confluent Flink Table API Documentation.
If you haven't already read the README for the repository, do that now. When you are done, stage the exercise by executing:
$ cd exercises
$ ./exercise.sh stage 01
Import the project (Maven POM file) from the exercises folder into your IDE.
Locate the file src/main/resources/cloud-template.properties and copy it to src/main/resources/cloud.properties.
Populate the values in the cloud.properties file as follows:
client.cloud - The name of the cloud provider you chose (eg. aws/gcp/azure).
client.region - The region you chose (eg. us-east-1).
client.flink-api-key - The key from the Flink API Key created above.
client.flink-api-secret - The secret from the Flink API Key created above.
client.organization-id (a GUID/UUID)
From the menu in the top right hand corner, select Organization Settings and look for the organization Id.
Command Line Reference:
confluent org list
client.environment-id (starts with env-)
Select the environment and look for the ID field.
Command Line Reference:
confluent environment list
client.compute-pool-id (starts with lfcp-)
Select the compute pool and look for the ID field.
Command Line Reference:
confluent flink compute-pool list
client.principal-id (starts with u-)
From the menu in the top right-hand corner, select Accounts & access and look for your user Id.
Command Line Reference:
confluent iam user list
client.kafka.bootstrap.servers (Ends with port 9092)
Navigate to your marketplace cluster, select Cluster settings, and look for the Bootstrap server.
Or, look in the API Key file for the Kafka cluster.
Command Line Reference:
confluent kafka cluster list
confluent kafka cluster describe <CLUSTER ID>
Look for the Endpoint, but remove the SASL_SSL://
client.kafka.sasl.jaas.config - Replace the <KAFKA KEY> and <KAFKA SECRET> placeholders with the Key and Secret from the Kafka API Key created above.
client.registry.url
Navigate to the flink-table-api-java environment. In the `Stream Governance API section, look for the Endpoint.
Command Line Reference:
confluent schema-registry cluster describe
Look for the Endpoint URL.
client.registry.key - The key from the Schema Registry API Key created above.
client.registry.secret - The secret from the Schema Registry API Key created above.
Confluent Cloud includes a read-only set of Flink tables in a sandbox-link environment. These tables can be used for experimentation and testing. For a simple test of the connection parameters, we can ask the Table API to list those tables.
In the src/main/java/marketplace/Marketplace class, implement the main method as follows:
Use the ConfluentSettings class to load the configuration from the cloud.properties file:
EnvironmentSettings settings = ConfluentSettings.fromResource("/YOUR.PROPERTIES.FILE");
You must prefix your properties file with the / as shown above.
Create a new table environment using the settings:
TableEnvironment env = TableEnvironment.create(settings);
Set the catalog to examples and the database to marketplace.
env.useCatalog(<Catalog Name>);
env.useDatabase(<Database Name>);
Use env.listTables() to produce a list of tables in the database and print the results.
Finally, we'll run the application to verify it works as expected.
In a terminal, execute the application by running the commands:
mvn clean package
java -jar target/flink-table-api-marketplace-0.1.jar
Assuming you have done everything correctly you should see the following tables printed:
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.