Get Started Free
course: Apache Flink® 101

Batch and Stream Processing with Flink SQL (Exercise)

David Anderson

David Anderson

Software Practice Lead

Batch and Stream Processing with Flink SQL (Exercise)

In this exercise you'll learn about running queries in Flink SQL using both batch and streaming execution modes, and observe some of the differences and similarities between them.

Prerequisite: Install Docker

These hands-on exercises use Docker to run a Flink cluster and the Flink SQL Client (also known as the SQL CLI). Before proceeding:

• Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it.

• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl.

• Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line.

With Docker running, do the following steps to get the Flink SQL CLI running:

git clone https://github.com/confluentinc/learn-apache-flink-101-exercises.git
cd learn-apache-flink-101-exercises
docker compose up --build -d
docker compose run sql-client

Once this is complete, you should find yourself at the Flink SQL Client CLI prompt:

Flink SQL>

At any time you can use the help; command to get help from the SQL CLI.

Experiment with Batch and Streaming

Let's begin by creating a table so there is something to query against. We're going to create a fixed-length (bounded) table with 500 rows of data generated by the faker table source. flink-faker is a convenient and powerful mock data generator designed to be used with Flink SQL.

CREATE TABLE `bounded_pageviews` (
  `url` STRING,
  `user_id` STRING,
  `browser` STRING,
  `ts` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'number-of-rows' = '500',
  'rows-per-second' = '100',
  'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
  'fields.user_id.expression' = '#{numerify ''user_##''}',
  'fields.browser.expression' = '#{Options.option ''chrome'', ''firefox'', ''safari'')}',
  'fields.ts.expression' =  '#{date.past ''5'',''1'',''SECONDS''}'
);

If you are curious what this data looks like, you can use this query to see a sample:

select * from bounded_pageviews limit 10;

Batch mode, bounded input

By default, the Flink SQL Client is running in streaming mode. Let's switch to batch mode so we can see how that behaves:

set 'execution.runtime-mode' = 'batch';

It will be easier to appreciate the differences between batch and streaming if we work with a query that produces an updating table as its result, like this one:

select count(*) AS `count` from bounded_pageviews;

When executing this query in batch mode, the sink receives only a single, final value, which you'll see displayed:

count
-----
  500

This takes 5 seconds to complete because the source is configured to produce 500 rows at a rate of 100 rows per second.

Streaming mode, bounded input

You can now try the same thing, but in streaming mode.

set 'execution.runtime-mode' = 'streaming';

If you look closely, you will see the count increment from 100 to 200, etc, up to 500. Again, this will take 5 seconds.

To make it clearer what's going on, you should also change how the results are being displayed. This won't affect the internal behavior of the runtime, but it will change how the SQL Client displays the results. In changelog mode, the SQL Client doesn't just update the count in place, but instead displays each message in the stream of updates it's receiving from the Flink SQL runtime.

set 'sql-client.execution.result-mode' = 'changelog';

Now when you execute the same query as before

select count(*) AS `count` from bounded_pageviews;

the results should appear like this:

 op                count
 -----------------------
...                  ...
 -U                  497
 +U                  498
 -U                  498
 +U                  499
 -U                  499
 +U                  500

When operating in streaming mode, the Flink runtime can't rely on the stream to ever end, so it is instead continuously updating the result as it processes the input stream. It ultimately arrives at the same result as when running in batch mode, but the sink for this streaming counting job is seeing all of the incremental work done along the way by the SQL runtime.

Streaming mode, unbounded input

To complete the picture, now try the streaming version running against an unbounded input stream.

Begin by creating a new, unbounded table. If the faker source isn't configured with a number-of-rows setting then it will just continue producing data indefinitely, so we'll remove that number-of-rows configuration option:

CREATE TABLE `pageviews` (
  `url` STRING,
  `user_id` STRING,
  `browser` STRING,
  `ts` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '100',
  'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
  'fields.user_id.expression' = '#{numerify ''user_##''}',
  'fields.browser.expression' = '#{Options.option ''chrome'', ''firefox'', ''safari'')}',
  'fields.ts.expression' =  '#{date.past ''5'',''1'',''SECONDS''}'
);

Here's that same query, updated to use the new table:

select count(*) AS `count` from pageviews;

You might also want to reset the SQL Client to use its default display mode, rather than the changelog mode we were using earlier to expose the inner workings of the update stream:

set 'sql-client.execution.result-mode' = 'table';

If you want to see this example running faster, or slower, you can modify the table using something like this:

ALTER TABLE `pageviews` SET ('rows-per-second' = '10');

ALTER TABLE is a part of the SQL standard that makes it easy to modify a table's definition. Otherwise you could always DROP the table and then CREATE it again with the updated configuration, but this is easier.

When you're done, you can exit Flink SQL with quit;.

How to Shut Down Docker

When you want to shut down the Flink cluster, you can do that with

docker compose down -v

which will stop all of the containers and remove the volume used for checkpointing.

But you may want to leave everything running for now, as we'll be using the same setup in the all of exercises for this course.

Resources

Use the promo code FLINK101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.