Get Started Free
course: Apache Kafka® 101

Hands-on Exercise: Stream Processing With Flink SQL

Gilles Philippart profile picture  (round 128px)

Gilles Philippart

Software Practice Lead

In this exercise, you’ll learn how to manipulate your data using Apache Flink® SQL. Up until now, we’ve been producing data to and reading data from an Apache Kafka® topic without any intermediate steps. With data transformation and aggregation, we can do so much more!

In the previous exercise, we created a Datagen Source Connector to generate a stream of orders to a Kafka topic. This exercise relies on the data produced by that connector, so make sure to complete it before proceeding.

Before we start, ensure that your Datagen Source Connector is still up and running.

In Confluent Cloud, each Kafka topic is automatically turned into a Flink table that leverages any associated schema. Working with Flink SQL streams is easy and often shorter than writing Java or Python programs. Follow these steps and see by yourself!

  1. From your cluster overview page, navigate to the orders topic.

    apache-kafka-101-exercice-flink-topic-view

  2. Click on the "Query with Flink" button at the top.

  3. Run the query displayed by default, for example:

    SELECT * FROM `demos`.`intro-to-kafka`.`orders` LIMIT 10;

    apache-kafka-101-exercice-flink-topic-default-query-v3

  4. Let's take a closer look at the ordertime field. It's an integer that represents the number of milliseconds since the Unix epoch. We can confirm this by inspecting the table schema Flink created from the topic's Avro schema:

    DESC orders;

    apache-kafka-101-exercice-flink-topic-desc-orders

  5. For this exercise, we want the ordertime column to use the TIMESTAMP_LTZ type, which Flink uses to represent an instant in time. We also want to flatten the attributes nested under the address field. Finally, you may have noticed that the original message key appears in raw binary format–this is because the Datagen Connector produced it as a string without a registered schema. With Flink SQL, we can apply all these transformations in a single query:

    SELECT 
        CAST(key as STRING) as key,
        TO_TIMESTAMP_LTZ(ordertime) as ordertime_ts,
        orderid,
        itemid,
        orderunits,
        address.city,
        address.state,
        address.zipcode
    FROM orders;

    apache-kafka-101-exercice-flink-topic-convert-to-ts-v2

  6. Much better! We now have the desired results–but they'll disappear once you close this screen. To make the transformations permanent, add a CREATE TABLE ... AS statement. This will make the query persistent, meaning it will run continuously in Confluent Cloud.

    CREATE TABLE orders_with_ts AS
    SELECT 
        CAST(key as STRING) as key,
        TO_TIMESTAMP_LTZ(ordertime) as ordertime_ts,
        orderid,
        itemid,
        orderunits,
        address.city,
        address.state,
        address.zipcode
    FROM orders;
  7. To use Flink SQL windowing features, we need to define which time column to use. By default, Flink uses $rowtime which corresponds to the Kafka message's timestamp attribute. However, we want to use ordertime, so update the watermark as follows:

    ALTER TABLE orders_with_ts
    MODIFY WATERMARK FOR ordertime_ts AS ordertime_ts - INTERVAL '1' SECOND;
  8. Flink offers a variety of window aggregation functions for time-based data analysis. To determine how many orders are made per state each week, use the following:

    CREATE TABLE weekly_orders_by_state AS
    SELECT 
      window_start,
      window_end,
      state,
      COUNT(DISTINCT orderid) AS distinct_orders
    FROM 
       TUMBLE(
         DATA => TABLE orders_with_ts,
         TIMECOL => DESCRIPTOR(ordertime_ts),
         SIZE => INTERVAL '7' DAY)
    GROUP BY window_start, window_end, state;
  9. Now, query the new table:

    SELECT * FROM weekly_orders_by_state;

    apache-kafka-101-exercice-flink-final-results

    As new orders are created, you’ll see the order counts per state increase for each defined window period.

  10. Confluent Cloud for Flink automatically creates a matching Kafka topic for each Flink table. If you navigate to the topics section, you'll notice a new weekly_orders_by_state Kafka topic that you can use to build application or sink to a external datastore with Kafka Connect. The possibilities are endless!

Bonus: Code Golf version

If you enjoy Code Golf, here's a more concise version that skips the intermediary table. Be sure to execute each query one at a time in the Flink SQL console:

ALTER TABLE orders ADD order_time_as_timestamp_ltz AS TO_TIMESTAMP_LTZ(ordertime);

ALTER TABLE orders MODIFY WATERMARK FOR order_time_as_timestamp_ltz AS order_time_as_timestamp_ltz - INTERVAL '1' SECOND;

CREATE TABLE weekly_orders_by_state_code_golf AS 
SELECT 
  window_start,
  window_end,
  address.state AS state,
  COUNT(DISTINCT orderid) AS distinct_orders
FROM 
   TUMBLE(
     DATA => TABLE orders,
     TIMECOL => DESCRIPTOR(order_time_as_timestamp_ltz),
     SIZE => INTERVAL '7' DAY)
GROUP BY window_start, window_end, address.state;

SELECT * FROM weekly_orders_by_state_code_golf;

Note

A final note to you as we wrap up the exercises for this course: Don’t forget to delete your resources and cluster in order to avoid exhausting the free Confluent Cloud usage that is provided to you. The easiest is to just delete the whole environment.

Flink SQL has much more to offer than we've covered here! If you're curious about the power of Flink, check out these courses:

  1. Apache Flink 101
  2. Apache Flink SQL
  3. Apache Flink Table API
Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.