If you have event streams in two Kafka topics, how can you join them together and create a new topic based on a common identifying attribute, where the new events are enriched from the original topics?
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir join-stream-and-stream && cd join-stream-and-stream
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud) and Apache Flink®. The Docker Compose file will start three Flink® containers that have Kafka connector dependencies preinstalled: an interactive Flink SQL client (flink-sql-client
) that sends streaming SQL jobs to the Flink Job Manager (flink-job-manager
), which in turn assigns tasks to the Flink Task Manager (flink-task-manager
) in the Flink cluster.
---
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# This is done for running a single broker in combined mode for local development only
# For multi-node deployments you should generate using the following
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
flink-jobmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- "9081:9081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
flink-taskmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
And launch it by running:
docker compose up -d
The best way to interact with Flink SQL when you’re learning how things work is with the Flink SQL CLI. Fire it up as follows:
docker exec -it flink-sql-client sql-client.sh
First, you’ll need to create a table to represent the orders:
CREATE TABLE orders (
id INT,
total_amount DOUBLE,
customer_name VARCHAR,
order_ts_raw BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'broker:29092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
Note that we are using the field order_ts_raw
as the record’s timestamp. This is going to be important later on when we write queries that need to know about the time each event occurred at. By using a field of the event, we can process the events at any time and get a deterministic result. This is known as event time.
Confluent Cloud manages several options for you when using Flink SQL. So, if you run this tutorial on Confluent Cloud, you can copy just the CREATE TABLE
statements without the WITH
clause when creating tables.
Consult the Flink SQL WITH documentation for the full list supported options when creating a table.
Secondly, you’ll need a table represent the shipments:
CREATE TABLE shipments (
id VARCHAR,
order_id INT,
warehouse VARCHAR,
ship_ts_raw BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'shipments',
'properties.bootstrap.servers' = 'broker:29092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
You’ll notice that both tables store the event time as a BIGINT
, which is great for computers but not very human-friendly for reading. You’ll also need to convert them into a TIMESTAMP
so that your query can specify a timeframe that events need to occur within each other, an interval join. You’ll learn how to address both issues when you create your join query.
Now let’s play with some events. Create the following orders:
INSERT INTO orders
VALUES ( 1, 404.89, 'Art Vandelay', 1692812175),
( 2, 50.45, 'Bob Sacamanto', 1692826575),
( 3, 113.23, 'Bilbo Baggins', 1692826575),
( 4, 90.43, 'Harry Potter', 1692812175),
( 5, 495.22, 'John Hechinger', 1692819375),
( 6, 410.13, 'Mandelorean', 1692826575),
( 7, 333.84, 'Jane Smith', 1692822975),
( 8, 26.14, 'HJ Pennypacker' , 1692819375),
( 9, 450.77, 'Colonel Mustard', 1692812175),
( 10,195.13, 'Prof. Jones', 1692822975);
In a similar manner, create the following shipments:
INSERT INTO shipments
VALUES ('shipment_1', 1, 'Bar Harbor', 1692815775),
('shipment_2', 2, 'Boston', 1692851775),
('shipment_3', 3, 'Providence', 1692851775),
('shipment_4', 4, 'Springfield', 1692826575),
('shipment_5', 5, 'Bar Harbor', 1692822975),
('shipment_6', 6, 'Boston', 1692851775),
('shipment_7', 7, 'Jackson Hole', 1692840975),
('shipment_8', 8, 'Whitefish' , 1692822975),
('shipment_9', 9, 'Jackson Hole', 1692984975),
('shipment_10', 10, 'Columbia Falls', 1692984975);
Let’s join these event streams together to gain some insight into the order-to-shipping process. Then we’ll discuss some of the concepts used in the query to enable retrieving your desired results.
NOTE: There are several columns in the results so it’s best to set the result mode to TABLEAU
using the following command:
SET sql-client.execution.result-mode=tableau;
Otherwise, in the default TABLE
format, the columns will scroll off the screen to the right.
Now execute the following query and study its output.
SELECT o.id as order_id,
FROM_UNIXTIME(o.order_ts_raw) as ORDER_TS,
o.total_amount as TOTAL,
o.customer_name as CUSTOMER,
s.id as SHIP_ID,
FROM_UNIXTIME(s.ship_ts_raw) as SHIP_TS,
s.warehouse,
TIMESTAMPDIFF(HOUR,
TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw)),
TO_TIMESTAMP(FROM_UNIXTIME(s.ship_ts_raw))) as HR_TO_SHIP
FROM orders o inner join shipments s
ON o.id = s.order_id
AND TO_TIMESTAMP(FROM_UNIXTIME(s.ship_ts_raw))
BETWEEN TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw))
AND TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw)) + INTERVAL '7' DAY;
This should yield the following output:
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------+
| op | order_id | ORDER_TS | TOTAL | CUSTOMER | SHIP_ID | SHIP_TS | warehouse | HR_TO_SHIP |
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------+
| +I | 1 | 2023-08-23 17:36:15 | 404.89 | Art Vandelay | shipment_1 | 2023-08-23 18:36:15 | Bar Harbor | 1 |
| +I | 2 | 2023-08-23 21:36:15 | 50.45 | Bob Sacamanto | shipment_2 | 2023-08-24 04:36:15 | Boston | 7 |
| +I | 3 | 2023-08-23 21:36:15 | 113.23 | Bilbo Baggins | shipment_3 | 2023-08-24 04:36:15 | Providence | 7 |
| +I | 4 | 2023-08-23 17:36:15 | 90.43 | Harry Potter | shipment_4 | 2023-08-23 21:36:15 | Springfield | 4 |
| +I | 5 | 2023-08-23 19:36:15 | 495.22 | John Hechinger | shipment_5 | 2023-08-23 20:36:15 | Bar Harbor | 1 |
| +I | 6 | 2023-08-23 21:36:15 | 410.13 | Mandelorean | shipment_6 | 2023-08-24 04:36:15 | Boston | 7 |
| +I | 7 | 2023-08-23 20:36:15 | 333.84 | Jane Smith | shipment_7 | 2023-08-24 01:36:15 | Jackson Hole | 5 |
| +I | 8 | 2023-08-23 19:36:15 | 26.14 | HJ Pennypacker | shipment_8 | 2023-08-23 20:36:15 | Whitefish | 1 |
| +I | 9 | 2023-08-23 17:36:15 | 450.77 | Colonel Mustard | shipment_9 | 2023-08-25 17:36:15 | Jackson Hole | 48 |
| +I | 10 | 2023-08-23 20:36:15 | 195.13 | Prof. Jones | shipment_10 | 2023-08-25 17:36:15 | Columbia Falls | 45 |
Earlier we talked about the event timestamps stored as a BIGINT
type which is great for flexibity, but hard to read and gain some meaning from the value. So to address that issue you used the FROM_UNIXTIME
function for both the order and shipment timestamp. FROM_UNIXTIME
converts a numeric type (an epoch based timestamp in this case) to a formatted string in the default format of yyyy-MM-dd HH:mm:ss
, which is now an easily understood format.
You also used additional temporal functions, TO_TIMESTAMP
, TIMESTAMPDIFF
, and INTERVAL
. TO_TIMESTAMP
converts a date string, like the one returned from TO_TIMESTAMP
into a timestamp suitable for other functions such as TIMESTAMPDIFF
and INTERVAL
. You used TIMESTAMPDIFF
to calculate the difference, in hours, between accepting the order and when it was shipped to the customer.
The query we issued performs an inner join between the orders and shipments. This kind of join only emits events when there’s a match on the criteria of both sides of the join. In effect, this only joins orders that have successfully shipped. Additionally you used the INTERVAL
function to perform an interval join, which also needs a sql timestamp to specify an addition join requirement that order and shipment occured within seven days of eachother.
Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table and then execute an INSERT INTO
statement to populate the table. The INSERT INTO
statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting shipped_orders
table.
Now go ahead and run the following two commands in your Flink SQL session:
CREATE TABLE shipped_orders (
order_id INT,
order_ts VARCHAR,
total DOUBLE,
customer VARCHAR,
ship_id VARCHAR,
ship_ts VARCHAR,
warehouse VARCHAR,
hours_to_ship INT
) WITH (
'connector' = 'kafka',
'topic' = 'shipped_orders',
'properties.bootstrap.servers' = 'broker:29092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'ship_id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
INSERT INTO shipped_orders
SELECT o.id as order_id,
FROM_UNIXTIME(o.order_ts_raw) as ORDER_TS,
o.total_amount as TOTAL,
o.customer_name as CUSTOMER,
s.id as SHIP_ID,
FROM_UNIXTIME(s.ship_ts_raw) as SHIP_TS,
s.warehouse,
TIMESTAMPDIFF(HOUR,
TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw)),
TO_TIMESTAMP(FROM_UNIXTIME(s.ship_ts_raw))) as HR_TO_SHIP
FROM orders o inner join shipments s
ON o.id = s.order_id
AND TO_TIMESTAMP(FROM_UNIXTIME(s.ship_ts_raw))
BETWEEN TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw))
AND TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw)) + INTERVAL '7' DAY;
Seeing is believing, so let’s query the persistent shipped_orders
table. First, set the result mode back to tableau
:
SET sql-client.execution.result-mode=tableau;
Then query the shipped_orders
table:
SELECT * FROM shipped_orders LIMIT 3;
This will yield the same output that the transient query did (perhaps in a different order)
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+---------------+
| op | order_id | order_ts | total | customer | ship_id | ship_ts | warehouse | hours_to_ship |
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+---------------+
| +I | 1 | 2023-08-23 17:36:15 | 404.89 | Art Vandelay | shipment_1 | 2023-08-23 18:36:15 | Bar Harbor | 1 |
| +I | 2 | 2023-08-23 21:36:15 | 50.45 | Bob Sacamanto | shipment_2 | 2023-08-24 04:36:15 | Boston | 7 |
| +I | 3 | 2023-08-23 21:36:15 | 113.23 | Bilbo Baggins | shipment_3 | 2023-08-24 04:36:15 | Providence | 7 |
We could also query the underlying topic directly using kafka-console-consumer
. Open a new terminal window and run the following command:
docker exec -it broker /usr/bin/kafka-console-consumer\
--topic shipped_orders\
--bootstrap-server broker:9092 \
--from-beginning \
--max-messages 10
This will yield the following results:
{"order_id":1,"order_ts":"2023-08-23 17:36:15","total":404.89,"customer":"Art Vandelay","ship_id":"shipment_1","ship_ts":"2023-08-23 18:36:15","warehouse":"Bar Harbor","hours_to_ship":1}
{"order_id":2,"order_ts":"2023-08-23 21:36:15","total":50.45,"customer":"Bob Sacamanto","ship_id":"shipment_2","ship_ts":"2023-08-24 04:36:15","warehouse":"Boston","hours_to_ship":7}
{"order_id":3,"order_ts":"2023-08-23 21:36:15","total":113.23,"customer":"Bilbo Baggins","ship_id":"shipment_3","ship_ts":"2023-08-24 04:36:15","warehouse":"Providence","hours_to_ship":7}
{"order_id":4,"order_ts":"2023-08-23 17:36:15","total":90.43,"customer":"Harry Potter","ship_id":"shipment_4","ship_ts":"2023-08-23 21:36:15","warehouse":"Springfield","hours_to_ship":4}
{"order_id":5,"order_ts":"2023-08-23 19:36:15","total":495.22,"customer":"John Hechinger","ship_id":"shipment_5","ship_ts":"2023-08-23 20:36:15","warehouse":"Bar Harbor","hours_to_ship":1}
{"order_id":6,"order_ts":"2023-08-23 21:36:15","total":410.13,"customer":"Mandelorean","ship_id":"shipment_6","ship_ts":"2023-08-24 04:36:15","warehouse":"Boston","hours_to_ship":7}
{"order_id":7,"order_ts":"2023-08-23 20:36:15","total":333.84,"customer":"Jane Smith","ship_id":"shipment_7","ship_ts":"2023-08-24 01:36:15","warehouse":"Jackson Hole","hours_to_ship":5}
{"order_id":8,"order_ts":"2023-08-23 19:36:15","total":26.14,"customer":"HJ Pennypacker","ship_id":"shipment_8","ship_ts":"2023-08-23 20:36:15","warehouse":"Whitefish","hours_to_ship":1}
{"order_id":9,"order_ts":"2023-08-23 17:36:15","total":450.77,"customer":"Colonel Mustard","ship_id":"shipment_9","ship_ts":"2023-08-25 17:36:15","warehouse":"Jackson Hole","hours_to_ship":48}
{"order_id":10,"order_ts":"2023-08-23 20:36:15","total":195.13,"customer":"Prof. Jones","ship_id":"shipment_10","ship_ts":"2023-08-25 17:36:15","warehouse":"Columbia Falls","hours_to_ship":45}
Processed a total of 10 messages
Now that you have manually developed and tested your Flink SQL application, how might you create an automated test for it so that it’s easier to maintain and upgrade over time? Imagine how painful it would be to have to manually test every change or software dependency upgrade of your application, and then imagine having to do this for many applications. The benefits of automated testing are clear, but how do we get there?
First, what do we want in an automated integration test? For starters:
Real services (as opposed to mock) that our application depends on
Small resource footprint so that developers can run the test locally
Low enough latency so that development iterations aren’t hampered — not as low latency as is required for a unit test, but test duration should be on the order of seconds
Isolation so that many tests can run concurrently on the same machine when this test gets run on a build automation server, e.g., no hard-coded ports
Luckily, tools are at our disposal to solve these problems. We’ll use Testcontainers to run containerized Kafka and Schema Registry servers on dynamic ports, Flink’s support for local execution environments so that we don’t need to spin up a Flink cluster, and Flink’s Table API to execute the Flink SQL statements that comprise our application.
The primary choices for programming language in which to write our tests are Java and Python given the need for Flink’s Table API. We’ll write ours in Java.
To start our test project, make new directories for test source code and resources within the same joining-stream-stream
folder that you created earlier:
mkdir -p src/test/java/io/confluent/developer
mkdir src/test/resources
Create the following Gradle build file, named build.gradle
, in the joining-stream-stream
directory.
buildscript {
repositories {
mavenCentral()
}
}
plugins {
id "java"
}
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
version = "0.0.1"
repositories {
mavenCentral()
}
dependencies {
testImplementation "com.google.guava:guava:31.1-jre"
testImplementation "junit:junit:4.13.2"
testImplementation 'org.testcontainers:testcontainers:1.17.6'
testImplementation 'org.testcontainers:kafka:1.17.6'
testImplementation "org.apache.flink:flink-sql-connector-kafka:1.17.1"
testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.17.1"
testImplementation "org.apache.flink:flink-test-utils:1.17.1"
testImplementation "org.apache.flink:flink-test-utils-junit:1.17.1"
testImplementation 'org.apache.flink:flink-json:1.17.1'
testImplementation "org.apache.flink:flink-table-api-java-bridge:1.17.0"
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1"
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1:tests"
testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.17.1"
}
There are a couple of important points to note in the Gradle build file:
Java sourceCompatibility
and targetCompatibility
are set to Java 11. Flink supports Java 8 (deprecated) and 11 as of the writing of this tutorial
The dependencies
section declares test dependencies for Testcontainers and Flink. Among the handful of Flink dependencies are ones providing local execution (e.g., flink-statebackend-rocksdb
), the Table API (flink-table-api-java-bridge
), and Kafka connectors (flink-sql-connector-kafka
).
And be sure to run the following command to obtain the Gradle wrapper:
gradle wrapper
Create the following file at src/test/resources/create-orders.sql.template
. Again, note the KAFKA_PORT
placeholder.
CREATE TABLE orders (
id INT,
total_amount DOUBLE,
customer_name VARCHAR,
order_ts_raw BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
We could always inline the SQL statements in our Java test code, but creating separate resource files makes our test more readable and easier to maintain. Further, we can imagine parametrizing URLs as well so that we can have a single set of source-controlled queries to use in tests as well as staging or production environments.
There are a handful of resources to create for our test. These mirror the queries that we developed earlier.
Create the following file at src/test/resources/create-shipments.sql.template
. Note the KAFKA_PORT
placeholder in this file. Our test will dynamically assign these to the ports that Testcontainers assigns.
CREATE TABLE shipments (
id VARCHAR,
order_id INT,
warehouse VARCHAR,
ship_ts_raw BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'shipments',
'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
Create the following file at src/test/resources/populate-orders.sql
:
INSERT INTO orders
VALUES ( 1, 404.89, 'Art Vandelay', 1692812175),
( 2, 50.45, 'Bob Sacamanto', 1692826575),
( 3, 113.23, 'Bilbo Baggins', 1692826575),
( 4, 90.43, 'Harry Potter', 1692812175),
( 5, 495.22, 'John Hechinger', 1692819375),
( 6, 410.13, 'Mandelorean', 1692826575),
( 7, 333.84, 'Jane Smith', 1692822975),
( 8, 26.14, 'HJ Pennypacker' , 1692819375),
( 9, 450.77, 'Colonel Mustard', 1692812175),
( 10,195.13, 'Prof. Jones', 1692822975);
Create the following file at src/test/resources/populate-shipments.sql
.
INSERT INTO shipments
VALUES ('shipment_1', 1, 'Bar Harbor', 1692815775),
('shipment_2', 2, 'Boston', 1692851775),
('shipment_3', 3, 'Providence', 1692851775),
('shipment_4', 4, 'Springfield', 1692826575),
('shipment_5', 5, 'Bar Harbor', 1692822975),
('shipment_6', 6, 'Boston', 1692851775),
('shipment_7', 7, 'Jackson Hole', 1692840975),
('shipment_8', 8, 'Whitefish' , 1692822975),
('shipment_9', 9, 'Jackson Hole', 1692984975),
('shipment_10', 10, 'Columbia Falls', 1692984975);
Next, create the table for the join result with the following file at src/test/resources/create-shipped-orders.sql.template
:
CREATE TABLE shipped_orders (
order_id INT,
order_ts VARCHAR,
total DOUBLE,
customer VARCHAR,
ship_id VARCHAR,
ship_ts VARCHAR,
warehouse VARCHAR,
hours_to_ship INT
) WITH (
'connector' = 'kafka',
'topic' = 'shipped_orders',
'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'ship_id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
Note again we specify the bootstrap servers configuration port with KAFKA_PORT
which our test code will replace with the actual port.
Now create the sql file that will populate the join table: src/test/resources/populate-shipped-orders-table.sql
.
INSERT INTO shipped_orders
SELECT o.id as order_id,
FROM_UNIXTIME(o.order_ts_raw) as ORDER_TS,
o.total_amount as TOTAL,
o.customer_name as CUSTOMER,
s.id as SHIP_ID,
FROM_UNIXTIME(s.ship_ts_raw) as SHIP_TS,
s.warehouse,
TIMESTAMPDIFF(HOUR,
TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw)),
TO_TIMESTAMP(FROM_UNIXTIME(s.ship_ts_raw))) as HR_TO_SHIP
FROM orders o inner join shipments s
ON o.id = s.order_id
AND TO_TIMESTAMP(FROM_UNIXTIME(s.ship_ts_raw))
BETWEEN TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw))
AND TO_TIMESTAMP(FROM_UNIXTIME(o.order_ts_raw)) + INTERVAL '7' DAY;
Next, create the following file at src/test/resources/query-join-order-shipments.sql
to select all results from the join table:
SELECT * FROM shipped_orders;
Finally, create the following file at src/test/resources/expected-shipped-orders.txt
that contains our test’s expected output:
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+---------------+
| op | order_id | order_ts | total | customer | ship_id | ship_ts | warehouse | hours_to_ship |
+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+---------------+
| +I | 1 | 2023-08-23 17:36:15 | 404.89 | Art Vandelay | shipment_1 | 2023-08-23 18:36:15 | Bar Harbor | 1 |
| +I | 3 | 2023-08-23 21:36:15 | 113.23 | Bilbo Baggins | shipment_3 | 2023-08-24 04:36:15 | Providence | 7 |
| +I | 8 | 2023-08-23 19:36:15 | 26.14 | HJ Pennypacker | shipment_8 | 2023-08-23 20:36:15 | Whitefish | 1 |
| +I | 2 | 2023-08-23 21:36:15 | 50.45 | Bob Sacamanto | shipment_2 | 2023-08-24 04:36:15 | Boston | 7 |
| +I | 6 | 2023-08-23 21:36:15 | 410.13 | Mandelorean | shipment_6 | 2023-08-24 04:36:15 | Boston | 7 |
| +I | 9 | 2023-08-23 17:36:15 | 450.77 | Colonel Mustard | shipment_9 | 2023-08-25 17:36:15 | Jackson Hole | 48 |
| +I | 4 | 2023-08-23 17:36:15 | 90.43 | Harry Potter | shipment_4 | 2023-08-23 21:36:15 | Springfield | 4 |
| +I | 5 | 2023-08-23 19:36:15 | 495.22 | John Hechinger | shipment_5 | 2023-08-23 20:36:15 | Bar Harbor | 1 |
| +I | 7 | 2023-08-23 20:36:15 | 333.84 | Jane Smith | shipment_7 | 2023-08-24 01:36:15 | Jackson Hole | 5 |
| +I | 10 | 2023-08-23 20:36:15 | 195.13 | Prof. Jones | shipment_10 | 2023-08-25 17:36:15 | Columbia Falls | 45 |
Create the following abstract test class at src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java
:
package io.confluent.developer;
import com.google.common.io.Resources;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.assertj.core.util.Sets;
import org.junit.BeforeClass;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.testcontainers.containers.KafkaContainer.KAFKA_PORT;
/**
* Base class for Flink SQL integration tests that use Flink's Kafka connectors. Encapsulates
* Kafka broker and Schema Registry Testcontainer management and includes utility methods for
* dynamically configuring Flink SQL Kafka connectors and processing Table API results.
*/
public class AbstractFlinkKafkaTest {
protected static StreamTableEnvironment streamTableEnv;
protected static Integer schemaRegistryPort, kafkaPort;
@BeforeClass
public static void setup() {
// create Flink table environment that test subclasses will use to execute SQL statements
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new EmbeddedRocksDBStateBackend());
Configuration configuration = new Configuration();
configuration.setString("table.local-time-zone", "UTC");
streamTableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inStreamingMode().withConfiguration(configuration).build());
// Start Kafka and Schema Registry Testcontainers. Set the exposed ports that test subclasses
// can use to dynamically configure Kafka connectors. Schema Registry enables connectors to
// be configured with 'value.format' = 'avro-confluent'
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.2"))
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1")
.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "500")
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withReuse(true)
.withNetwork(network);
kafka.start();
kafkaPort = kafka.getMappedPort(KAFKA_PORT);
GenericContainer schemaRegistry = new GenericContainer(DockerImageName.parse("confluentinc/cp-schema-registry:7.3.2"))
.withExposedPorts(8081)
.withNetwork(kafka.getNetwork())
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + kafka.getNetworkAliases().get(0) + ":9092");
schemaRegistry.start();
schemaRegistryPort = schemaRegistry.getMappedPort(8081);
}
/**
* Given a resource filename and optional Kafka / Schema Registry ports, return the resource
* file contents as a String with ports substituted for KAFKA_PORT and SCHEMA_REGISTRY_PORT
* placeholders.
*
* @param resourceFileName the resource file name
* @param kafkaPort the port that Kafka broker exposes
* @param schemaRegistryPort the port that Schema Registry exposes
* @return resource file contents with port values substituted for placeholders
* @throws IOException if resource file can't be read
*/
protected static String getResourceFileContents(
String resourceFileName,
Optional<Integer> kafkaPort,
Optional<Integer> schemaRegistryPort
) throws IOException {
URL url = Resources.getResource(resourceFileName);
String contents = Resources.toString(url, StandardCharsets.UTF_8);
if (kafkaPort.isPresent()) {
contents = contents.replaceAll("KAFKA_PORT", kafkaPort.get().toString());
}
if (schemaRegistryPort.isPresent()) {
contents = contents.replaceAll("SCHEMA_REGISTRY_PORT", schemaRegistryPort.get().toString());
}
return contents;
}
/**
* Given a resource filename, return the resource file contents as a String.
*
* @param resourceFileName the resource file name
* @return resource file contents
* @throws IOException if resource file can't be read
*/
protected static String getResourceFileContents(
String resourceFileName
) throws IOException {
// no Kafka / Schema Registry ports
return getResourceFileContents(resourceFileName, Optional.empty(), Optional.empty());
}
/**
* Utility method to convert a String containing multiple lines into a set of Strings where
* each String is one line. This is useful for creating Flink SQL integration tests based on
* the tableau results printed via the Table API where the order of results is nondeterministic.
*
* @param s multiline String
* @return set of Strings where each member is one line
*/
protected static Set<String> stringToLineSet(String s) {
return s.lines().map(line -> line.replaceAll("\\s", ""))
.collect(Collectors.toSet());
}
/**
* Given a Flink Table API `TableResult` representing a SELECT statement result,
* capture and return the statement's tableau results.
*
* @param tableResult Flink Table API `TableResult` representing a SELECT statement result
* @return the SELECT statement's tableau results
*/
protected static String tableauResults(TableResult tableResult) {
// capture tableau results printed to stdout in a String
ByteArrayOutputStream baos = new ByteArrayOutputStream();
System.setOut(new PrintStream(baos));
// The given table result may come from a table backed by the Kafka or Upsert Kafka connector,
// both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking
// on calls to this method, we kick off a thread to kill the underlying job once output has
// been printed.
//
// Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate
// the need to do this. However, the Upsert Kafka connector will still be unbounded.
new Thread(() -> {
while (0 == baos.size()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// do nothing; keep waiting
}
}
tableResult.getJobClient().get().cancel();
}).start();
try {
tableResult.print();
} catch (RuntimeException rte) {
if (ExceptionUtils.indexOfThrowable(rte, JobCancellationException.class) != -1) {
// a JobCancellationException in the exception stack is expected due to delayed
// job cancellation in separate thread; do nothing
} else {
rte.printStackTrace();
System.exit(1);
}
}
System.setOut(System.out);
return baos.toString();
}
}
Take a look at this class. It contains the functionality and utility methods that any Flink SQL test would use. Namely, it encapsulates Kafka broker Testcontainer management and includes utility methods for dynamically configuring Flink SQL Kafka connectors and processing Table API results.
Next, create the test implementation at src/test/java/io/confluent/developer/FlinkSqlIntervalJoinTest.java
:
package io.confluent.developer;
import org.apache.flink.table.api.TableResult;
import org.junit.Test;
import java.util.Optional;
import java.util.Set;
import static org.junit.Assert.*;
public class FlinkSqlIntervalJoinTest extends AbstractFlinkKafkaTest {
@Test
public void simpleSelect() throws Exception {
// create base movie sales table and aggregation table, and populate with test data
streamTableEnv.executeSql(getResourceFileContents("create-orders.sql.template",
Optional.of(kafkaPort),Optional.of(schemaRegistryPort))).await();
streamTableEnv.executeSql(getResourceFileContents("create-shipments.sql.template",
Optional.of(kafkaPort),Optional.of(schemaRegistryPort))).await();
streamTableEnv.executeSql(getResourceFileContents("populate-orders.sql"));
streamTableEnv.executeSql(getResourceFileContents("populate-shipments.sql"));
streamTableEnv.executeSql(getResourceFileContents("create-shipped-orders.sql.template",
Optional.of(kafkaPort),Optional.of(schemaRegistryPort))).await();
// In Flink 17 and later
// by setting 'scan.bounded.mode' = 'latest-offset' the CREATE TABLE statement, will
// cause this INSERT to terminate once the latest offset is reached.
streamTableEnv.executeSql(getResourceFileContents("populate-shipped-orders-table.sql"));
// execute query on result table that should have joined shipments with orders
TableResult tableResult = streamTableEnv.executeSql(getResourceFileContents("query-join-order-shipments.sql"));
// Compare actual and expected results. Convert result output to line sets to compare so that order
// doesn't matter
String actualTableauResults = tableauResults(tableResult);
String expectedTableauResults = getResourceFileContents("expected-shipped-orders.txt");
assertEquals(stringToLineSet(actualTableauResults), stringToLineSet(expectedTableauResults));
}
}
The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output TABLE
of our application and compares the results to what’s expected.
Now run the test, which is as simple as:
./gradlew test
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.
Sign up for Confluent Cloud, a fully managed Apache Kafka service.
After you log in to Confluent Cloud Console, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
Next, from the Confluent Cloud Console, click on Clients
to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.