Software Practice Lead
In this exercise, you will first be setting up a Confluent Cloud Cluster that you will be able to use for the rest of the course.
Note: If you already have a Confluent Cloud account, you can skip ahead to Create a New Environment.
Watch your inbox for a confirmation email and follow the link to proceed.
You will be asked to create a cluster. Feel free to ignore this. We'll be creating a cluster in a later step.
Note: To ensure isolation and easy cleanup, we do not recommend using an existing cluster.
If you prefer to work from the command line, you can install the Confluent CLI by following the instructions here.
Once you've installed the CLI, you can login using:
confluent login --save
Then, refer to the Command Line Reference sections that follow many of the instructions below.
We will create a cloud environment specifically for this course. This will ensure isolation and allow for easy cleanup.
Name your environment apache-flink-101.
When offered a choice on which Stream Governance Package to use, select Essentials.
Select the cloud and region where you want to create your Schema Registry and Stream Catalog (i.e. where you will be storing the metadata).
If you prefer, you can also do this from the command line interface by running:
confluent environment create apache-flink-101
Once your environment is created, you will need to make it the active environment.
First, list the environments and locate the Id for the apache-flink-101 environment.
confluent environment list
Next, set it as the active environment:
confluent environment use <environment id>
Next, we need to create a Kafka Cluster for the course.
Basic clusters used in the context of this exercise won't incur much cost, and the amount of free usage that you receive along with the promo code FLINK101 for $25 of free Confluent Cloud usage will be more than enough to cover it.
If you prefer, you can do this from the command line interface (you should adjust the cluster name, cloud, and region settings as appropriate):
confluent kafka cluster create <Your Cluster Name> --cloud gcp --region us-central1 --type basic
We will need an API Key to allow applications to access our cluster, so let's create one.
Create a key with Global access.
Note: For secure production environments, you would want to select Granular access and configure it more securely.
Download and save the key somewhere for future use.
If you prefer, you can do this from the command line interface (again, adjust the cluster name and id as appropriate):
confluent api-key create --resource <Your Cluster Id> --description <Your Cluster Name>-key -o json > <Your Cluster Name>-key.json
Now we're going to use the Flink SQL Client to create a job that will write data into Kafka in Confluent Cloud. Unlike the ephemeral jobs we've created so far that depend on the Flink SQL Client to act as the sink, this will be a persistent job that will run independently of the SQL Client.
First let's create a topic this job can use for its sink table.
Alternatively you can create this topic from the command line using the following:
confluent kafka topic create pageviews --cluster <cluster id> --environment <environment id>
Now we can use the SQL Client to create a new table backed by this Kafka topic. (If you need help getting the SQL Client running, refer to the instructions in the first exercise.)
To do this, you're going to need to know your cluster's bootstrap server, which might be something like pkc-41973.westus2.azure.confluent.cloud:9092.
You can find this info under "Cluster Settings" on Confluent Cloud.
Alternatively, you can find this info using the CLI, which will show you the cluster endpoint. (And don't include the "SASL_SSL://" prefix when you create the table.)
confluent kafka cluster describe <cluster id>
With this info in hand, we can proceed to create the table, replacing these strings with the values you should now have:
Please note that this is NOT the best way to define this table. A fixed sink partitioner is used here to set something up for a later exercise, but normally you won't want that particular setting.
CREATE TABLE pageviews_kafka (
`url` STRING,
`user_id` STRING,
`browser` STRING,
`ts` TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.group.id' = 'demoGroup',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'BOOTSTRAP_SERVER',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="API_SECRET";',
'value.format' = 'json',
'sink.partitioner' = 'fixed'
);
The syntax here is a bit delicate, with lots of quoting going on. If you got the syntax right, the SQL CLI will respond with this:
[INFO] Execute statement succeed.
We also need a source table. Let's use a table like the one we used before:
CREATE TABLE `pageviews` (
`url` STRING,
`user_id` STRING,
`browser` STRING,
`ts` TIMESTAMP(3)
)
WITH (
'connector' = 'faker',
'rows-per-second' = '10',
'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
'fields.user_id.expression' = '#{numerify ''user_##''}',
'fields.browser.expression' = '#{Options.option ''chrome'', ''firefox'', ''safari'')}',
'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'
);
Now you're ready to launch a job that will copy data from the flink-faker table into the pageviews topic in kafka. Moving data from one table to another can be done with a insert statement like this one:
INSERT INTO destination_table SELECT * FROM source_table;
Modify the statement above to take data from the pageviews table and send it to the pageviews_kafka table, and then execute it.
The SQL Client should respond with something like this:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 99022c831a50aa6dc5d17b79d7e9b4cb
Next, we'll take a look at the running job in the Flink UI.
You can do this by either going to Confluent Cloud and inspecting the topic directly to see the messages being produced, or by querying the topic with Flink SQL.
The pageviews_kafka table is already being used as sink, but you can, at the same time, also use it as a source. Go ahead and try that now, either with this query, or something more elaborate:
SELECT * FROM pageviews_kafka;
If you want to try something more elaborate, the following query will group all of the records by the browser specified in the table (GROUP BY browser). It will then provide the name of each browser (SELECT browser), and a count of how many pageviews used that browser (COUNT(*)):
SELECT browser, COUNT(*) FROM pageviews_kafka GROUP BY browser;
If you visit the Flink UI (athttp://localhost:8081) while one of these interactive queries is running, you'll see that the cluster has two active jobs!
The job that's writing into Confluent Cloud is detached from the SQL Client. You should stop it now in order to limit the charges being made to your Confluent Cloud account.
So how can you stop it?
An easy way to do this is to click on the "Cancel Job" link shown in the upper righthand corner of the Flink UI while inspecting that job. You could also shutdown the Docker containers that are running Flink. But one way or another, be sure to stop this job.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.