Software Practice Lead
In this exercise, you’ll learn how to manipulate your data using Apache Flink® SQL. Up until now, we’ve been producing data to and reading data from an Apache Kafka® topic without any intermediate steps. With data transformation and aggregation, we can do so much more!
In the previous exercise, we created a Datagen Source Connector to generate a stream of orders to a Kafka topic. This exercise relies on the data produced by that connector, so make sure to complete it before proceeding.
Before we start, ensure that your Datagen Source Connector is still up and running.
In Confluent Cloud, each Kafka topic is automatically turned into a Flink table that leverages any associated schema. Working with Flink SQL streams is easy and often shorter than writing Java or Python programs. Follow these steps and see by yourself!
From your cluster overview page, navigate to the orders topic.
Click on the "Query with Flink" button at the top.
Run the query displayed by default, for example:
SELECT * FROM `demos`.`intro-to-kafka`.`orders` LIMIT 10;
Let's take a closer look at the ordertime field. It's an integer that represents the number of milliseconds since the Unix epoch. We can confirm this by inspecting the table schema Flink created from the topic's Avro schema:
DESC orders;
For this exercise, we want the ordertime column to use the TIMESTAMP_LTZ type, which Flink uses to represent an instant in time. We also want to flatten the attributes nested under the address field. Finally, you may have noticed that the original message key appears in raw binary format–this is because the Datagen Connector produced it as a string without a registered schema. With Flink SQL, we can apply all these transformations in a single query:
SELECT
CAST(key as STRING) as key,
TO_TIMESTAMP_LTZ(ordertime) as ordertime_ts,
orderid,
itemid,
orderunits,
address.city,
address.state,
address.zipcode
FROM orders;
Much better! We now have the desired results–but they'll disappear once you close this screen. To make the transformations permanent, add a CREATE TABLE ... AS statement. This will make the query persistent, meaning it will run continuously in Confluent Cloud.
CREATE TABLE orders_with_ts AS
SELECT
CAST(key as STRING) as key,
TO_TIMESTAMP_LTZ(ordertime) as ordertime_ts,
orderid,
itemid,
orderunits,
address.city,
address.state,
address.zipcode
FROM orders;
To use Flink SQL windowing features, we need to define which time column to use. By default, Flink uses $rowtime which corresponds to the Kafka message's timestamp attribute. However, we want to use ordertime, so update the watermark as follows:
ALTER TABLE orders_with_ts
MODIFY WATERMARK FOR ordertime_ts AS ordertime_ts - INTERVAL '1' SECOND;
Flink offers a variety of window aggregation functions for time-based data analysis. To determine how many orders are made per state each week, use the following:
CREATE TABLE weekly_orders_by_state AS
SELECT
window_start,
window_end,
state,
COUNT(DISTINCT orderid) AS distinct_orders
FROM
TUMBLE(
DATA => TABLE orders_with_ts,
TIMECOL => DESCRIPTOR(ordertime_ts),
SIZE => INTERVAL '7' DAY)
GROUP BY window_start, window_end, state;
Now, query the new table:
SELECT * FROM weekly_orders_by_state;
As new orders are created, you’ll see the order counts per state increase for each defined window period.
Confluent Cloud for Flink automatically creates a matching Kafka topic for each Flink table. If you navigate to the topics section, you'll notice a new weekly_orders_by_state Kafka topic that you can use to build application or sink to a external datastore with Kafka Connect. The possibilities are endless!
If you enjoy Code Golf, here's a more concise version that skips the intermediary table. Be sure to execute each query one at a time in the Flink SQL console:
ALTER TABLE orders ADD order_time_as_timestamp_ltz AS TO_TIMESTAMP_LTZ(ordertime);
ALTER TABLE orders MODIFY WATERMARK FOR order_time_as_timestamp_ltz AS order_time_as_timestamp_ltz - INTERVAL '1' SECOND;
CREATE TABLE weekly_orders_by_state_code_golf AS
SELECT
window_start,
window_end,
address.state AS state,
COUNT(DISTINCT orderid) AS distinct_orders
FROM
TUMBLE(
DATA => TABLE orders,
TIMECOL => DESCRIPTOR(order_time_as_timestamp_ltz),
SIZE => INTERVAL '7' DAY)
GROUP BY window_start, window_end, address.state;
SELECT * FROM weekly_orders_by_state_code_golf;
A final note to you as we wrap up the exercises for this course: Don’t forget to delete your resources and cluster in order to avoid exhausting the free Confluent Cloud usage that is provided to you. The easiest is to just delete the whole environment.
Flink SQL has much more to offer than we've covered here! If you're curious about the power of Flink, check out these courses:
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.