How to create tumbling windows

Question:

If you have time series events in a Kafka topic, how can you group them into fixed-size, non-overlapping, contiguous time intervals?

Edit this page

Example use case:

Suppose you have a topic with events that represent movie ratings. In this tutorial, we'll write a program to maintain tumbling windows that count the total number of ratings that each movie has received.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir tumbling-windows && cd tumbling-windows
3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud) and Apache Flink®. The Docker Compose file will start three Flink® containers that have Kafka connector dependencies preinstalled: an interactive Flink SQL client (flink-sql-client) that sends streaming SQL jobs to the Flink Job Manager (flink-job-manager), which in turn assigns tasks to the Flink Task Manager (flink-task-manager) in the Flink cluster.

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - 8081:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
  flink-sql-client:
    image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
    hostname: flink-sql-client
    container_name: flink-sql-client
    depends_on:
      - flink-jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: flink-jobmanager
    volumes:
      - ./settings/:/settings
  flink-jobmanager:
    image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
    hostname: flink-jobmanager
    container_name: flink-jobmanager
    ports:
      - 9081:9081
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        rest.bind-port: 9081
  flink-taskmanager:
    image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
    hostname: flink-taskmanager
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 10

Launch it by running:

docker compose up -d

Write the program interactively using the CLI

4

The best way to interact with Flink SQL when you’re learning how things work is with the Flink SQL CLI. Fire it up as follows:

docker exec -it flink-sql-client sql-client.sh

First, you’ll need to create a stream to represent movie ratings. The line of Flink SQL DDL below creates a table and its underlying Kafka topic. Note that we are defining the schema for the table, which includes five fields: rating_id, the internal ID of the rating; title, the title of the movie being rated; release_year, the year the movie was released; rating, the floating-point rating of the movie from 0 to 10; and ts, the timestamp when the rating was submitted. The statement also specifies the underlying Kafka topic as ratings, that it should have a single partition (the default num.partitions configured in the broker), and defines Avro as its data format.

The timestamp is an important attribute since we’ll be modeling the number of ratings that each movie receives over time. Also, because we are going to aggregate over time windows, we must define a watermark strategy. In this case, we use strictly ascending timestamps, i.e., any row with a timestamp that is less than or equal to the latest observed event timestamp is considered late and ignored. This is safe for this tutorial since we will insert events in ascending timestamp order, but for other scenarios, a delayed watermark strategy may be more appropriate. This would allow a grace period during which late data can arrive, impacting tumbling window aggregations rather than being ignored.

CREATE TABLE ratings (
    rating_id INT,
    title STRING,
    release_year INT,
    rating DOUBLE,
    ts TIMESTAMP(3),
    -- declare ts as event time attribute and use strictly ascending timestamp watermark strategy
    WATERMARK FOR ts AS ts
) WITH (
    'connector' = 'kafka',
    'topic' = 'ratings',
    'properties.bootstrap.servers' = 'broker:9092',
    'scan.startup.mode' = 'earliest-offset',
    'key.format' = 'raw',
    'key.fields' = 'rating_id',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry:8081',
    'value.fields-include' = 'EXCEPT_KEY'
);

Confluent Cloud manages several options for you when using Flink SQL. So, if you run this tutorial on Confluent Cloud, you can copy just the CREATE TABLE statements without the WITH clause when creating tables. Consult the Flink SQL WITH documentation for the full list supported options when creating a table.

Now, produce a few events that represent movie ratings over time. Note how the timestamps vary across different hours of the day.

INSERT INTO ratings VALUES
    (0, 'Die Hard', 1998, 8.2, TO_TIMESTAMP('2023-07-09 01:00:00')),
    (1, 'The Big Lebowski', 1998, 4.2, TO_TIMESTAMP('2023-07-09 02:00:00')),
    (2, 'Die Hard', 1998, 4.5, TO_TIMESTAMP('2023-07-09 05:00:00')),
    (3, 'The Big Lebowski', 1998, 9.9, TO_TIMESTAMP('2023-07-09 06:30:00')),
    (4, 'Die Hard', 1998, 5.1, TO_TIMESTAMP('2023-07-09 07:00:00')),
    (5, 'Tree of Life', 2011, 5.6, TO_TIMESTAMP('2023-07-09 08:00:00')),
    (6, 'Tree of Life', 2011, 4.9, TO_TIMESTAMP('2023-07-09 09:00:00')),
    (7, 'A Walk in the Clouds', 1995, 3.6, TO_TIMESTAMP('2023-07-09 12:00:00')),
    (8, 'A Walk in the Clouds', 1995, 6.0, TO_TIMESTAMP('2023-07-09 15:00:00')),
    (9, 'Super Mario Bros.', 1993, 3.5, TO_TIMESTAMP('2023-07-09 18:30:00')),
    (10, 'A Walk in the Clouds', 1995, 4.6, TO_TIMESTAMP('2023-07-10 01:00:00'));

With our test data in place, let’s figure out how many ratings were given to each movie in tumbling (i.e., consecutive and non-overlapping) 6-hour intervals (and also what the average rating is for each window). To do that, we issue the following transient query to aggregate the ratings, grouped by the movie’s title. This query uses a windowing table-valued function (TVF) to compute the ratings count and average rating for 6-hour time windows. It also captures the window start and end times.

This query will continue to return results until you quit by entering Q.

SELECT title,
    COUNT(*) AS rating_count,
    AVG(rating) AS avg_rating,
    window_start,
    window_end
FROM TABLE(TUMBLE(TABLE ratings, DESCRIPTOR(ts), INTERVAL '6' HOURS))
GROUP BY title, window_start, window_end;
If you have a table that does not have an explicit timestamp column you can use the SYSTEM column $rowtime for the DESCRIPTOR parameter when performing windowed aggregations with Flink SQL in Confluent Cloud.

This should yield the following output:

                          title         rating_count                     avg_rating            window_start              window_end
                       Die Hard                    2                           6.35 2023-07-09 00:00:00.000 2023-07-09 06:00:00.000
               The Big Lebowski                    1                            4.2 2023-07-09 00:00:00.000 2023-07-09 06:00:00.000
                       Die Hard                    1                            5.1 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
                   Tree of Life                    2                           5.25 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
               The Big Lebowski                    1                            9.9 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
           A Walk in the Clouds                    2                            4.8 2023-07-09 12:00:00.000 2023-07-09 18:00:00.000
              Super Mario Bros.                    1                            3.5 2023-07-09 18:00:00.000 2023-07-10 00:00:00.000

Observe that the sum of all rating counts is 10, whereas there are 11 ratings in the ratings table. Why is this? First, remember that we defined a strictly ascending timestamps watermark strategy. Second, keep in mind that a windowed aggregation does not emit intermediate results but only a final result. So, in this case, the rating with rating_id 10 and timestamp 2023-07-10 01:00:00 served to "close" the window ending at 2023-07-10 00:00:00.000 because its event time is later than the window end time.

Enter Q to return to the Flink SQL prompt.

Note that these results were materialized in memory and printed in a human-readable table representation because the default sql-client.execution.result-mode configuration value is 'table'. You can view non-materialized streaming results as a changelog by running SET 'sql-client.execution.result-mode' = 'changelog'; and rerunning the same query. The results will look like this:

 op                          title         rating_count                     avg_rating            window_start              window_end
 +I                       Die Hard                    2                           6.35 2023-07-09 00:00:00.000 2023-07-09 06:00:00.000
 +I               The Big Lebowski                    1                            4.2 2023-07-09 00:00:00.000 2023-07-09 06:00:00.000
 +I                       Die Hard                    1                            5.1 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
 +I                   Tree of Life                    2                           5.25 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
 +I               The Big Lebowski                    1                            9.9 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
 +I           A Walk in the Clouds                    2                            4.8 2023-07-09 12:00:00.000 2023-07-09 18:00:00.000
 +I              Super Mario Bros.                    1                            3.5 2023-07-09 18:00:00.000 2023-07-10 00:00:00.000

Or, as a third option, you can see streaming results non-materialized and inline in the SQL client by running SET 'sql-client.execution.result-mode' = 'tableau'; and rerunning the query once more. In this case, the results will look very similar to changelog mode results. This is because tables sourced by the Kafka connector are unbounded and can thus only be queried in streaming mode. For further reading on these Flink SQL concepts, consult the documentation on SQL client result modes and streaming vs. batch execution

Since the output of our transient query looks right, the next step is to make it persistent with the following CREATE TABLE statement. Go ahead and execute the following statement in your Flink SQL session:

CREATE TABLE ratings_by_6hr_window (
    title STRING,
    rating_count BIGINT,
    avg_rating DOUBLE,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'ratings-by-6hr-window',
    'properties.bootstrap.servers' = 'broker:9092',
    'scan.startup.mode' = 'earliest-offset',
    'key.format' = 'avro-confluent',
    'key.avro-confluent.url' = 'http://schema-registry:8081',
    'key.fields' = 'title;window_start;window_end',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry:8081',
    'value.fields-include' = 'ALL'
);

Now let’s insert data into the table with the following statement:

INSERT INTO ratings_by_6hr_window
    SELECT title,
       COUNT(*) AS rating_count,
       AVG(rating) AS avg_rating,
       window_start,
       window_end
    FROM TABLE(TUMBLE(TABLE ratings, DESCRIPTOR(ts), INTERVAL '6' HOURS))
    GROUP BY title, window_start, window_end;

Validate output

5

Seeing is believing, so let’s query the persistent ratings_by_6hr_window table. First, set the result mode back to table:

SET 'sql-client.execution.result-mode' = 'table';

Then query the ratings_by_6hr_window table:

select * from ratings_by_6hr_window;

This will yield the same output that the transient query did:

                          title         rating_count                     avg_rating            window_start              window_end
                       Die Hard                    2                           6.35 2023-07-09 00:00:00.000 2023-07-09 06:00:00.000
               The Big Lebowski                    1                            4.2 2023-07-09 00:00:00.000 2023-07-09 06:00:00.000
                       Die Hard                    1                            5.1 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
                   Tree of Life                    2                           5.25 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
               The Big Lebowski                    1                            9.9 2023-07-09 06:00:00.000 2023-07-09 12:00:00.000
           A Walk in the Clouds                    2                            4.8 2023-07-09 12:00:00.000 2023-07-09 18:00:00.000
              Super Mario Bros.                    1                            3.5 2023-07-09 18:00:00.000 2023-07-10 00:00:00.000

We could also query the underlying topic directly using kafka-avro-console-consumer. Open a new terminal window and run the following command:

docker exec -e SCHEMA_REGISTRY_LOG4J_OPTS=" " -it schema-registry /usr/bin/kafka-avro-console-consumer \
  --topic ratings-by-6hr-window \
  --from-beginning \
  --max-messages 7 \
  --timeout-ms 10000 \
  --bootstrap-server broker:9092

This will yield the following results:

{"title":{"string":"Die Hard"},"rating_count":{"long":2},"avg_rating":{"double":6.35},"window_start":{"long":1688860800000},"window_end":{"long":1688882400000}}
{"title":{"string":"The Big Lebowski"},"rating_count":{"long":1},"avg_rating":{"double":4.2},"window_start":{"long":1688860800000},"window_end":{"long":1688882400000}}
{"title":{"string":"Die Hard"},"rating_count":{"long":1},"avg_rating":{"double":5.1},"window_start":{"long":1688882400000},"window_end":{"long":1688904000000}}
{"title":{"string":"Tree of Life"},"rating_count":{"long":2},"avg_rating":{"double":5.25},"window_start":{"long":1688882400000},"window_end":{"long":1688904000000}}
{"title":{"string":"The Big Lebowski"},"rating_count":{"long":1},"avg_rating":{"double":9.9},"window_start":{"long":1688882400000},"window_end":{"long":1688904000000}}
{"title":{"string":"A Walk in the Clouds"},"rating_count":{"long":2},"avg_rating":{"double":4.8},"window_start":{"long":1688904000000},"window_end":{"long":1688925600000}}
{"title":{"string":"Super Mario Bros."},"rating_count":{"long":1},"avg_rating":{"double":3.5},"window_start":{"long":1688925600000},"window_end":{"long":1688947200000}}
Processed a total of 7 messages

Test it

Decide what testing tools to use

1

Now that you have manually developed and tested your Flink SQL application, how might you create an automated test for it so that it’s easier to maintain and upgrade over time? Imagine how painful it would be to have to manually test every change or software dependency upgrade of your application, and then imagine having to do this for many applications. The benefits of automated testing are clear, but how do we get there?

First, what do we want in an automated integration test? For starters:

  1. Real services (as opposed to mock) that our application depends on

  2. Small resource footprint so that developers can run the test locally

  3. Low enough latency so that development iterations aren’t hampered — not as low latency as is required for a unit test, but test duration should be on the order of seconds

  4. Isolation so that many tests can run concurrently on the same machine when this test gets run on a build automation server, e.g., no hard-coded ports

Luckily, tools are at our disposal to solve these problems. We’ll use Testcontainers to run containerized Kafka and Schema Registry servers on dynamic ports, Flink’s support for local execution environments so that we don’t need to spin up a Flink cluster, and Flink’s Table API to execute the Flink SQL statements that comprise our application.

Create the test skeleton

2

The primary choices for programming language in which to write our tests are Java and Python given the need for Flink’s Table API. We’ll write ours in Java.

To start our test project, make new directories for test source code and resources within the same tumbling-windows folder that you created earlier:

mkdir -p src/test/java/io/confluent/developer
mkdir src/test/resources

Create the following Gradle build file, named build.gradle, in the tumbling-windows directory.

buildscript {
    repositories {
        mavenCentral()
    }
}

plugins {
    id "java"
    id 'idea'
}

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
version = "0.0.1"

repositories {
    mavenCentral()
}

dependencies {
    testImplementation "com.google.guava:guava:31.1-jre"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.testcontainers:testcontainers:1.17.6'
    testImplementation 'org.testcontainers:kafka:1.17.6'
    testImplementation "org.apache.flink:flink-sql-connector-kafka:1.16.0"
    testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.16.0"
    testImplementation "org.apache.flink:flink-test-utils:1.16.0"
    testImplementation "org.apache.flink:flink-test-utils-junit:1.16.0"
    testImplementation "org.apache.flink:flink-table-api-java-bridge:1.16.0"
    testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.0"
    testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.0:tests"
    testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.16.0"
}

There are a couple of important points to note in the Gradle build file:

  1. Java sourceCompatibility and targetCompatibility are set to Java 11. Flink supports Java 8 (deprecated) and 11 as of the writing of this tutorial

  2. The dependencies section declares test dependencies for Testcontainers and Flink. Among the handful of Flink dependencies are ones providing local execution (e.g., flink-statebackend-rocksdb), the Table API (flink-table-api-java-bridge), and Kafka connectors that can use Schema Registry (flink-sql-connector-kafka and flink-sql-avro-confluent-registry).

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Create SQL resources

3

We could always inline the SQL statements in our Java test code, but creating separate resource files makes our test more readable and easier to maintain. Further, we can imagine parametrizing URLs as well so that we can have a single set of source-controlled queries to use in tests as well as staging or production environments.

There are a handful of resources to create for our test. These mirror the queries that we developed earlier.

Create the following file at src/test/resources/create-ratings.sql.template. Note the KAFKA_PORT and SCHEMA_REGISTRY_PORT placeholders in this file. Our test will dynamically assign these to the ports that Testcontainers assigns.

CREATE TABLE ratings (
    rating_id INT,
    title STRING,
    release_year INT,
    rating DOUBLE,
    ts TIMESTAMP(3),
    -- declare ts as event time attribute and use strictly ascending timestamp watermark strategy
    WATERMARK FOR ts AS ts
) WITH (
    'connector' = 'kafka',
    'topic' = 'ratings',
    'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
    'scan.startup.mode' = 'earliest-offset',
    'key.format' = 'raw',
    'key.fields' = 'rating_id',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT',
    'value.fields-include' = 'EXCEPT_KEY'
);

Create the following file at src/test/resources/populate-ratings.sql.

INSERT INTO ratings VALUES
    (0, 'Die Hard', 1998, 8.2, TO_TIMESTAMP('2023-07-09 01:00:00')),
    (1, 'The Big Lebowski', 1998, 4.2, TO_TIMESTAMP('2023-07-09 02:00:00')),
    (2, 'Die Hard', 1998, 4.5, TO_TIMESTAMP('2023-07-09 05:00:00')),
    (3, 'The Big Lebowski', 1998, 9.9, TO_TIMESTAMP('2023-07-09 06:30:00')),
    (4, 'Die Hard', 1998, 5.1, TO_TIMESTAMP('2023-07-09 07:00:00')),
    (5, 'Tree of Life', 2011, 5.6, TO_TIMESTAMP('2023-07-09 08:00:00')),
    (6, 'Tree of Life', 2011, 4.9, TO_TIMESTAMP('2023-07-09 09:00:00')),
    (7, 'A Walk in the Clouds', 1995, 3.6, TO_TIMESTAMP('2023-07-09 12:00:00')),
    (8, 'A Walk in the Clouds', 1995, 6.0, TO_TIMESTAMP('2023-07-09 15:00:00')),
    (9, 'Super Mario Bros.', 1993, 3.5, TO_TIMESTAMP('2023-07-09 18:30:00')),
    (10, 'A Walk in the Clouds', 1995, 4.6, TO_TIMESTAMP('2023-07-10 01:00:00'));

Create the following file at src/test/resources/create-ratings-by-6hr-window.sql.template. Again, note the KAFKA_PORT and SCHEMA_REGISTRY_PORT placeholders.

CREATE TABLE ratings_by_6hr_window (
   title STRING,
    rating_count BIGINT,
    avg_rating DOUBLE,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'ratings-by-6hr-window',
    'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
    'scan.startup.mode' = 'earliest-offset',
    'key.format' = 'avro-confluent',
    'key.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT',
    'key.fields' = 'title;window_start;window_end',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT',
    'value.fields-include' = 'ALL'
); 

Next create the following file at src/test/resources/populate-ratings-by-6hr-window.sql to populate the table.

INSERT INTO ratings_by_6hr_window
    SELECT title,
       COUNT(*) AS rating_count,
       AVG(rating) AS avg_rating,
       window_start,
       window_end
    FROM TABLE(TUMBLE(TABLE ratings, DESCRIPTOR(ts), INTERVAL '6' HOURS))
    GROUP BY title, window_start, window_end;

Next, create the following file at src/test/resources/query-ratings-by-6hr-window.sql:

SELECT * FROM ratings_by_6hr_window;

Finally, create the following file at src/test/resources/expected-ratings-by-6hr-window.txt that contains our test’s expected output:

+----+--------------------------------+----------------------+--------------------------------+-------------------------+-------------------------+
| op |                          title |         rating_count |                     avg_rating |            window_start |              window_end |
+----+--------------------------------+----------------------+--------------------------------+-------------------------+-------------------------+
| +I |                       Die Hard |                    2 |                           6.35 | 2023-07-09 00:00:00.000 | 2023-07-09 06:00:00.000 |
| +I |               The Big Lebowski |                    1 |                            4.2 | 2023-07-09 00:00:00.000 | 2023-07-09 06:00:00.000 |
| +I |                       Die Hard |                    1 |                            5.1 | 2023-07-09 06:00:00.000 | 2023-07-09 12:00:00.000 |
| +I |                   Tree of Life |                    2 |                           5.25 | 2023-07-09 06:00:00.000 | 2023-07-09 12:00:00.000 |
| +I |               The Big Lebowski |                    1 |                            9.9 | 2023-07-09 06:00:00.000 | 2023-07-09 12:00:00.000 |
| +I |           A Walk in the Clouds |                    2 |                            4.8 | 2023-07-09 12:00:00.000 | 2023-07-09 18:00:00.000 |
| +I |              Super Mario Bros. |                    1 |                            3.5 | 2023-07-09 18:00:00.000 | 2023-07-10 00:00:00.000 |

Write a test

4

Create the following abstract test class at src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java:

package io.confluent.developer;


import com.google.common.io.Resources;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.assertj.core.util.Sets;
import org.junit.BeforeClass;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;

import static org.testcontainers.containers.KafkaContainer.KAFKA_PORT;

/**
 * Base class for Flink SQL integration tests that use Flink's Kafka connectors. Encapsulates
 * Kafka broker and Schema Registry Testcontainer management and includes utility methods for
 * dynamically configuring Flink SQL Kafka connectors and processing Table API results.
 */
public class AbstractFlinkKafkaTest {

  protected static StreamTableEnvironment streamTableEnv;
  protected static Integer schemaRegistryPort, kafkaPort;

  @BeforeClass
  public static void setup() {
    // create Flink table environment that test subclasses will use to execute SQL statements
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    env.setStateBackend(new EmbeddedRocksDBStateBackend());
    streamTableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inStreamingMode().build());


    // Start Kafka and Schema Registry Testcontainers. Set the exposed ports that test subclasses
    // can use to dynamically configure Kafka connectors. Schema Registry enables connectors to
    // be configured with 'value.format' = 'avro-confluent'
    Network network = Network.newNetwork();

    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.1"))
        .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
        .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
        .withEnv("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1")
        .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "500")
        .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
        .withReuse(true)
        .withNetwork(network);
    kafka.start();
    kafkaPort = kafka.getMappedPort(KAFKA_PORT);

    GenericContainer schemaRegistry = new GenericContainer(DockerImageName.parse("confluentinc/cp-schema-registry:7.4.1"))
        .withExposedPorts(8081)
        .withNetwork(kafka.getNetwork())
        .withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost")
        .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
        .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + kafka.getNetworkAliases().get(0) + ":9092");
    schemaRegistry.start();
    schemaRegistryPort = schemaRegistry.getMappedPort(8081);
  }

  /**
   * Given a resource filename and optional Kafka / Schema Registry ports, return the resource
   * file contents as a String with ports substituted for KAFKA_PORT and SCHEMA_REGISTRY_PORT
   * placeholders.
   *
   * @param resourceFileName    the resource file name
   * @param kafkaPort           the port that Kafka broker exposes
   * @param schemaRegistryPort  the port that Schema Registry exposes
   * @return resource file contents with port values substituted for placeholders
   * @throws IOException if resource file can't be read
   */
  protected static String getResourceFileContents(
      String resourceFileName,
      Optional<Integer> kafkaPort,
      Optional<Integer> schemaRegistryPort
  ) throws IOException {
    URL url = Resources.getResource(resourceFileName);
    String contents = Resources.toString(url, StandardCharsets.UTF_8);
    if (kafkaPort.isPresent()) {
      contents = contents.replaceAll("KAFKA_PORT", kafkaPort.get().toString());
    }
    if (schemaRegistryPort.isPresent()) {
      contents = contents.replaceAll("SCHEMA_REGISTRY_PORT", schemaRegistryPort.get().toString());
    }
    return contents;
  }

  /**
   * Given a resource filename, return the resource file contents as a String.
   *
   * @param resourceFileName    the resource file name
   * @return resource file contents
   * @throws IOException if resource file can't be read
   */
  protected static String getResourceFileContents(
      String resourceFileName
  ) throws IOException {
    // no Kafka / Schema Registry ports
    return getResourceFileContents(resourceFileName, Optional.empty(), Optional.empty());
  }

  /**
   * Utility method to convert a String containing multiple lines into a set of String's where
   * each String is one line. This is useful for creating Flink SQL integration tests based on
   * the tableau results printed via the Table API where the order of results is nondeterministic.
   *
   * @param s multiline String
   * @return set of String's where each member is one line
   */
  protected static Set<String> stringToLineSet(String s) {
    return Sets.newHashSet(Arrays.asList(s.split("\\r?\\n")));
  }

  /**
   * Given a Flink Table API `TableResult` respresenting a SELECT statement result,
   * capture and return the statement's tableau results.
   *
   * @param tableResult Flink Table API `TableResult` respresenting a SELECT statement result
   * @return the SELECT statement's tableau results
   */
  protected static String tableauResults(TableResult tableResult) {
    // capture tableau results printed to stdout in a String
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    System.setOut(new PrintStream(baos));

    // The given table result may come from a table backed by the Kafka or Upsert Kafka connector,
    // both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking
    // on calls to this method, we kick off a thread to kill the underlying job once output has
    // been printed.
    //
    // Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate
    // the need to do this. However, the Upsert Kafka connector will still be unbounded.
    new Thread(() -> {
      while (0 == baos.size()) {
        try {
          Thread.sleep(500);
        } catch (InterruptedException e) {
          // do nothing; keep waiting
        }
      }
      tableResult.getJobClient().get().cancel();
    }).start();

    try {
      tableResult.print();
    } catch (RuntimeException rte) {
      if (ExceptionUtils.indexOfThrowable(rte, JobCancellationException.class) != -1) {
        // a JobCancellationException in the exception stack is expected due to delayed
        // job cancellation in separate thread; do nothing
      } else {
        rte.printStackTrace();
        System.exit(1);
      }
    }
    System.setOut(System.out);
    return baos.toString();
  }

}

Take a look at this class. It contains the functionality and utility methods that any Flink SQL test would use. Namely, it encapsulates Kafka broker and Schema Registry Testcontainer management and includes utility methods for dynamically configuring Flink SQL Kafka connectors and processing Table API results.

Next, create the test implementation at src/test/java/io/confluent/developer/FlinkSqlTumblingWindowTest.java:

package io.confluent.developer;


import org.apache.flink.table.api.TableResult;
import org.junit.Test;

import java.util.Optional;

import static org.junit.Assert.assertEquals;

public class FlinkSqlTumblingWindowTest extends AbstractFlinkKafkaTest {

  @Test
  public void testTumblingWindows() throws Exception {
    // create base ratings table and aggregation table, and populate with test data
    streamTableEnv.executeSql(getResourceFileContents("create-ratings.sql.template",
        Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await();
    streamTableEnv.executeSql(getResourceFileContents("populate-ratings.sql"));
    streamTableEnv.executeSql(getResourceFileContents("create-ratings-by-6hr-window.sql.template",
        Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await();
    streamTableEnv.executeSql(getResourceFileContents("populate-ratings-by-6hr-window.sql"));

    TableResult tableResult = streamTableEnv.executeSql(getResourceFileContents("query-ratings-by-6hr-window.sql"));

    // Compare actual and expected results. Convert result output to line sets to compare so that order
    // doesn't matter.
    String actualTableauResults = tableauResults(tableResult);
    String expectedTableauResults = getResourceFileContents("expected-ratings-by-6hr-window.txt");
    assertEquals(stringToLineSet(expectedTableauResults), stringToLineSet(actualTableauResults));
  }

}

The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output TABLE of our application and compares the results to what’s expected.

Invoke the test

5

Now run the test, which is as simple as:

./gradlew test

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.