Software Practice Lead
This exercise will help you get started using Flink SQL running on Confluent Cloud. For help using Flink SQL running in a local Docker environment, see the companion exercise.
First, sign up for a free Confluent Cloud account.
Next, install the Confluent CLI if you don't already have it. In your terminal:
brew install confluentinc/tap/cli
If you don't use Homebrew, you can use a different installation method.
After installing the Confluent CLI, login to your Confluent Cloud account:
confluent login --prompt --save
FREE ACCESS: Basic clusters used in the context of this exercise won't incur much cost, and the amount of free usage that you receive along with the promo code FLINKSQL for $25 of free Confluent Cloud usage will be more than enough to cover it. You can also use the promo code CONFLUENTDEV1 to delay entering a credit card for 30 days.
The confluent-flink-quickstart CLI plugin creates all of the resources you need to get started with Confluent Cloud for Apache Flink. Install it by running:
confluent plugin install confluent-flink-quickstart
Confluent Cloud provides resources for running SQL statements in the form of compute pools.
You will need a compute pool to execute the SQL statements you create during this course. This compute pool will scale up and down as needed; when no SQL is running the compute pool will not consume any resources.
Use the Flink quickstart plugin to create a Flink compute pool in AWS region us-east-1:
confluent flink quickstart \
--name sql-course \
--max-cfu 10 \
--region us-east-1 \
--cloud aws
This will create a new Confluent Cloud environment named sql-course_environment with the following resources:
--max-cfu 10 puts an upper bound on how many resources the compute pool can consume, as measured in CFUs, which is the logical unit of processing power used by Confluent Cloud for billing.
After a couple of minutes, you will enter an interactive Flink shell where you can start running queries.
Note: if you exit the Flink shell, you can return to it by running confluent flink shell.
Your account has been pre-populated with some sample data. To access this data, try the following:
use catalog examples;
use marketplace;
show tables;
You should now see a list with these four tables: clicks, customers, orders, and products. Unlike any other tables you might create or use with Flink on Confluent Cloud, these tables in the marketplace database are backed by a data generator rather than Kafka topics (which means that the data in these tables isn't stored anywhere).
You can now start using SQL to explore these tables. If you aren't sure what to try, here are some suggestions to help you get started:
describe clicks;
select * from orders where price < 20;
select distinct brand from products;
These sample tables are held in a Flink SQL database named marketplace, which in turn is in a catalog named examples.
If you execute
show catalogs;
you will get a result something like this:
+------------------------+--------------+
| catalog name | catalog id |
+------------------------+--------------+
| examples | cat-examples |
| default | env-7q66vj |
| sql-course_environment | env-9zxx07 |
+------------------------+--------------+
examples is a special, read-only catalog powered by data generators creating sample data. default and sql-course_environment are normal, physical catalogs corresponding to Kafka environments in Confluent Cloud.
Similarly, you can use show databases to see a list of all of the databases in the current catalog.
Earlier, when you executed use catalog examples and use marketplace this made the examples catalog and its marketplace database the default namespace for finding tables. This makes it possible to execute queries like select * from customers without fully specifying to where to find the customers table.
However, Flink SQL is able to access data across different catalogs and databases, so long as the underlying Kafka clusters are all in the same region. For example, you can always refer to the customers table by fully specifying where it is:
select * from `examples`.`marketplace`.`customers`;
Note: the use of backtick characters, e.g., `examples`.`marketplace`.`customers` rather than examples.marketplace.customers, isn't always necessary. Backticks are used in SQL to signify that the string being quoted is the name of a catalog, database, table, or column. Disambiguating with backticks becomes necessary when one of these objects has a name that collides with a reserved keyword in the SQL language, or when the name uses a character that can be misinterpreted, such as -.
In Confluent Cloud, Kafka topics and schemas are always in sync with Flink, simplifying how you can process your data. Any topic created in Kafka is visible directly as a table in Flink, and any table created in Flink is visible as a topic in Kafka. Effectively, Flink provides a SQL interface on top of Confluent Cloud.
Because Flink follows the SQL standard, the terminology is slightly different from Kafka. The following table shows the mapping between Kafka and Flink terminology.
Kafka | Flink | Notes |
---|---|---|
Environment | Catalog | Flink can query and join data that are in any environments/catalogs |
Cluster | Database | Flink can query and join data that are in different clusters/databases |
Topic + Schema | Table | Kafka topics and Flink tables are always in sync. You never need to to declare tables manually for existing topics. Creating a table in Flink creates a topic and the associated schema. |
As a result, when you start using Flink SQL, you can directly access all of the Kafka environments, clusters, and topics that you already have in Confluent Cloud, without creating any additional metadata.
Compared with open source Flink, the main difference is that the Data Definition Language (DDL) statements related to managing catalogs, databases, and tables act directly on physical objects. For example, when you create a table in Flink, the corresponding Kafka topic and schema are immediately created in Confluent Cloud, whereas with open source Flink, creating a table backed by Kafka only creates metadata describing how to interpret a topic (that may or may not exist) as a table.
Also, Confluent Cloud provides a unified approach to metadata management, centered around the schemas in Schema Registry. If you’re already on Confluent Cloud, you'll automatically see tables for your existing topics that are ready to query using Flink, simplifying data discovery and exploration.
Creating a table creates a Kafka topic in the specified cluster, as well as a schema for that table in the schema registry. This ensures that any Kafka consumer can work with this data, whether it's part of a Flink job, or not.
First we'll change the default catalog and database, just to make things a bit easier to read, and then we'll create a new table:
use catalog `sql-course_environment`;
use `sql-course_kafka-cluster`;
create table `small-orders` (
`order_id` STRING NOT NULL,
`customer_id` INT NOT NULL,
`product_id` STRING NOT NULL,
`price` DOUBLE NOT NULL
);
If you want to set the number of partitions, or other properties of the underlying Kafka topic, this can be done as part of creating the table, e.g.,
create table `small-orders` (
`order_id` STRING NOT NULL,
`customer_id` INT NOT NULL,
`product_id` STRING NOT NULL,
`price` DOUBLE NOT NULL
) distributed into 1 buckets
with (
'kafka.retention.time' = '1 d'
);
To see the configuration for given table, use show create table:
show create table `small-orders`;
You'll find a complete list of the available table properties in the documentation.
Ad hoc queries are useful for learning SQL and gaining understanding of your data, but where Flink SQL really shines is for setting up continuously running SQL statements that clean, transform, combine, and enrich data.
Later in this course you'll learn how to do more elaborate stream processing with Flink SQL, but for now, here's a simple example you can experiment with:
insert into `small-orders`
select * from `examples`.`marketplace`.`orders` where price < 20;
What this does is to create a continously running SQL statement that copies each of the smaller orders, as it arrives, into the small-orders Kafka topic.
You can now use the Confluent Cloud Data Portal, the Confluent Cloud CLI, or Flink SQL to examine the data in the small-orders topic/table.
If you have followed along this far, you have now created a couple of resources in Confluent Cloud that are going to show up on your bill: the small-orders topic, and the SQL statement that is copying data into it. You'll want to delete both this statement and topic.
These resources are in the sql-course_environment environment, so a straightforward way to ensure that everything has been cleaned up is to delete this environment, which will delete both the Kafka cluster and the Flink compute pool you've been using. Run the following command in your terminal to get the ID of the environment named sql-course_environment, which will be of the form env-123456:
confluent environment list
Now delete the environment:
confluent environment delete <ENVIRONMENT_ID>
You can also manage these resources in your browser, at https://confluent.cloud, if you prefer.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.