Get Started Free

Apache Flink® Quick Start

The guide below demonstrates how to quickly get started with Apache Flink. You'll create a Flink compute pool in Confluent Cloud, create a table, insert data, and finally use Flink to process streams of generated data. Be sure to also check out the Flink SQL tutorials here and the Apache Flink 101 course to learn more.

1. Sign up for Confluent Cloud

First sign up for a free Confluent Cloud account.

Next, install the Confluent CLI if you don't already have it. In your terminal:

brew install confluentinc/tap/cli

If you don't use Homebrew, you can use a different installation method.

After installing the Confluent CLI, login to your Confluent Cloud account:

confluent login --prompt --save

The confluent-flink-quickstart CLI plugin creates all of the resources that you need to get started with Confluent Cloud for Apache Flink. Install it by running:

confluent plugin install confluent-flink-quickstart

Run the Flink quick start plugin as follows to create a Flink compute pool in AWS region us-east-1.

confluent flink quickstart \
    --name quickstart \
    --max-cfu 10 \
    --region us-east-1 \
    --cloud aws \
    --datagen-quickstarts shoe_orders shoes

This will create a new Confluent Cloud environment quickstart_environment and the following resources within it:

  • Schema Registry enabled
  • a Kafka cluster named quickstart_kafka-cluster
  • Datagen Source connectors for orders and associated products, including the API key needed for them to access Kafka
  • Flink compute pool named quickstart

After a couple of minutes, you will enter an interactive Flink shell where you can start running queries.

Note: if you exit the Flink shell, you can return to it by running confluent flink shell.

4. Create your first table

With Confluent Cloud for Apache Flink, you can either create tables directly and have the backing Kafka topics created for you automatically, or you can use Flink as a query layer over existing topics. Let's start with the former. In the Flink shell, create a table called quickstart that has a single STRING column named message:

CREATE TABLE quickstart(message STRING);

Next, insert a row of data:

INSERT INTO quickstart VALUES ('hello world');

Finally, query the table and see the message that you just inserted:

SELECT * from quickstart;

Because any topic created in Kafka is visible directly as a table in Flink, we can also use Flink to process active streams of data generated by the Datagen Source connector.

In your interactive Flink shell:

SHOW TABLES;

Observe that, in addition to the quickstart table just created, there are shoe_orders and shoes tables corresponding to the two Datagen Source connectors that we provisioned:

+-------------+
| table name  |
+-------------+
| quickstart  |
| shoe_orders |
| shoes       |
+-------------+

Let's explore the schema of the shoe_orders table:

DESCRIBE shoe_orders;

And view a few rows of data:

SELECT * FROM shoe_orders LIMIT 10;

Observe that a shoe order contains the product_id of the purchased product that is a GUID like 1db091e5-c516-4286-84a2-8d687d6c0331. You can view more human-readable metadata about a product with the following query on the shoes table (substitute a GUID that you observed):

SELECT brand, name, sale_price
FROM shoes
WHERE id='1db091e5-c516-4286-84a2-8d687d6c0331';

You will see output like this:

brand            name            sale_price
Williamson Group Impreza Pro 634 7995

You may see more than one row for a given ID because the shoes table continuously receives product metadata updates.

6. Run an aggregated join query

Now that we know what the shoe_orders and related shoes tables look like, let's join these tables and calculate a human-readable aggregation. For example, suppose we want to know the total order sales per shoe brand. Because the shoe brand is metadata from the shoes table, we join it to the shoe_orders table as in the the following query. Kick it off in your interactive Flink shell, and then we will look more closely at what the query is doing.

WITH latest_shoes AS (
    SELECT * FROM (
        SELECT *,
               $rowtime,
               ROW_NUMBER() OVER (PARTITION BY id ORDER BY $rowtime DESC) AS rownum
        FROM shoes
    )
    WHERE rownum = 1
)
SELECT brand, ROUND(SUM(sale_price) / 100.00, 2) AS revenue
FROM shoe_orders
    JOIN latest_shoes
    ON shoe_orders.product_id = latest_shoes.id
GROUP BY brand;

You will see results like the following that will continue to be updated as new orders come in:

brand                            revenue
Braun-Bruen                      23356.78
Ankunding and Sons               26384.16
Okuneva, McCullough and Reynolds 20128.90
Harvey-Kuhic                     24735.17
Beer, DAmore and Wintheiser      24686.89
Gleichner-Buckridge              19484.84

Let's look at what this query is doing. First, because the shoes table can have multiple rows with the same id, we use a common table expression (aliased latest_shoes) to deduplicate and only get the latest metadata for each shoe. Refer to this tutorial for a deeper look at deduplicating Flink SQL tables.

The remainder of the query is straightforward. We join enrich shoe orders by joining the shoe_orders table to the latest_shoes result set, and then group by brand so that the SUM(sale_price) aggregate yields the total revenue per brand.

The aggregation query will continue to run and update results until you cancel it. To see live updates, enter M on the query result screen to see the revenue totals get updated in real time. You will see pairs of updates to increment item counts that look like this:

-U        Renner-DAmore 17879.46
+U        Renner-DAmore 18009.41    

This shows that the previous total for the Renner-DAmore brand was updated from 17879.46 to 18009.41.

7. Run a windowed aggregation

The aggregated join query in the previous step sums total revenue per brand for all time. What if you instead wanted to observe a trend across time? To accomplish this, you would compute an aggregation over discrete windows of time. For example, let's compute the total number of orders per day.

Because the records in the shoe_orders table contain a timestamp column ts, we will be using event time stream processing semantics. That is, the timestamp used for windowing and determining if an event arrives late comes from the event source itself as opposed to from the event's time of ingest into Kafka. In order to window by the event time, we first need to explicitly define the shoe_orders table's watermark strategy in terms of the ts column. Watermarks are used to determine when a window can close and whether an event arrives late and should be ignored. In Confluent Cloud for Apache Flink, the default watermark strategy is defined in terms of ingestion time (available as the system column $rowtime), but you can override this with the following command to use strictly ascending ts column values:

ALTER TABLE shoe_orders MODIFY WATERMARK FOR ts AS ts;

A strictly ascending watermark strategy means that an event is considered late and ignored if its timestamp is less than the maximum timestamp observed so far.

With the watermark strategy defined, you can now aggregate over event time-based windows as in the following query that outputs the number of orders received per day:

SELECT count(*) AS order_count,
       window_start
FROM TABLE(TUMBLE(TABLE shoe_orders, DESCRIPTOR(ts), INTERVAL '1' DAY)) GROUP BY window_start;

The query output should look like this:

order_count window_start
864         2021-01-01 00:00:00.000
236         2021-01-02 00:00:00.000        

Because the shoes orders from the Datagen Source connector are 100 seconds apart, the number of orders per day is expected to be 864 since there are 86,400 seconds in a day.

One final note on windowing — in this example, we used tumbling windows, which are fixed-size (1 day in our case), non-overlapping, and contiguous. If you'd like to explore other windowing modes, check out this tutorial on hopping windows or this one on cumulating windows.

8. Clean up

When you are done exploring, be sure to clean up any Confluent Cloud resources created for this quick start. Since you created all resources in a Confluent Cloud environment, you can simply delete the environment and all resources created for this quick start will be deleted (i.e., the Kafka cluster, connectors, Flink compute pool, and associated API keys). Run the following command in your terminal to get the environment ID of the form env-123456 corresponding to the environment named quickstart_environment:

confluent environment list

Now delete the environment:

confluent environment delete <ENVIRONMENT_ID>

What's Next

  • Build Apps
  • Build Pipelines
  • Operate
Build Apps
  • Build Apps
  • Build Pipelines
  • Operate

Tutorials with Full Code Examples

Learn the basics

Step through the basics of the CLI, Kafka topics, and building applications.

Explore top use cases

Run pre-built ksqlDB recipes that tackle the highest impact use cases for stream processing

Master advanced concepts

Learn how to route events, manipulate streams, aggregate data, and more.

Get Started with Kafka Clients

Write your first application using these full code examples in Java, Python, Go, .NET, Node.js, C/C++, REST, Spring Boot, and further languages and CLIs.

Top 3 Courses for Application Developers

Confluent Cloud is a fully managed Apache Kafka service available on all three major clouds. Try it for free today.

Try it for free

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.