Consider a topic with events that represent book publications. In this tutorial, we'll use Flink SQL to find only the publications written by a particular author.
Let's assume the following DDL for our base publication_events table:
CREATE TABLE publication_events (
book_id INT,
author STRING,
title STRING
);
Given the publication_events table definition above, we can filter to the publications by a particular author using a WHERE clause:
SELECT *
FROM publication_events
WHERE author = 'George R. R. Martin';
You can run the example backing this tutorial in one of three ways: a Flink Table API-based JUnit test, locally with the Flink SQL Client against Flink and Kafka running in Docker, or with Confluent Cloud.
Clone the confluentinc/tutorials GitHub repository (if you haven't already) and navigate to the tutorials directory:
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
Run the following command to execute FlinkSqlFilteringTest#testFilter:
./gradlew clean :filtering:flinksql:test
The test starts Kafka and Schema Registry with Testcontainers, runs the Flink SQL commands above against a local Flink StreamExecutionEnvironment, and ensures that the filter results are what we expect.
Clone the confluentinc/tutorials GitHub repository (if you haven't already) and navigate to the tutorials directory:
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
Start Flink and Kafka:
docker compose -f ./docker/docker-compose-flinksql.yml up -d
Next, open the Flink SQL Client CLI:
docker exec -it flink-sql-client sql-client.sh
Finally, run following SQL statements to create the publication_events table backed by Kafka running in Docker, populate it with test data, and run the filter query.
CREATE TABLE publication_events (
book_id INT,
author STRING,
title STRING
) WITH (
'connector' = 'kafka',
'topic' = 'publication_events',
'properties.bootstrap.servers' = 'broker:9092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'book_id',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://schema-registry:8081',
'value.fields-include' = 'EXCEPT_KEY'
);
INSERT INTO publication_events VALUES
(0, 'C.S. Lewis', 'The Silver Chair'),
(1, 'George R. R. Martin', 'A Song of Ice and Fire'),
(2, 'C.S. Lewis', 'Perelandra'),
(3, 'George R. R. Martin', 'Fire & Blood'),
(4, 'J. R. R. Tolkien', 'The Hobbit'),
(5, 'J. R. R. Tolkien', 'The Lord of the Rings'),
(6, 'George R. R. Martin', 'A Dream of Spring'),
(7, 'J. R. R. Tolkien', 'The Fellowship of the Ring'),
(8, 'George R. R. Martin', 'The Ice Dragon'),
(9, 'Mario Puzo', 'The Godfather');
SELECT *
FROM publication_events
WHERE author = 'George R. R. Martin';
The query output should look like this:
book_id author title
1 George R. R. Martin A Song of Ice and Fire
3 George R. R. Martin Fire & Blood
6 George R. R. Martin A Dream of Spring
8 George R. R. Martin The Ice Dragon
When you are finished, clean up the containers used for this tutorial by running:
docker compose -f ./docker/docker-compose-flinksql.yml down
In the Confluent Cloud Console, navigate to your environment and then click the Open SQL Workspace button for the compute pool that you have created.
Select the default catalog (Confluent Cloud environment) and database (Kafka cluster) to use with the dropdowns at the top right.
Finally, run following SQL statements to create the publication_events table, populate it with test data, and run the filter query.
CREATE TABLE publication_events (
book_id INT,
author STRING,
title STRING
);
INSERT INTO publication_events VALUES
(0, 'C.S. Lewis', 'The Silver Chair'),
(1, 'George R. R. Martin', 'A Song of Ice and Fire'),
(2, 'C.S. Lewis', 'Perelandra'),
(3, 'George R. R. Martin', 'Fire & Blood'),
(4, 'J. R. R. Tolkien', 'The Hobbit'),
(5, 'J. R. R. Tolkien', 'The Lord of the Rings'),
(6, 'George R. R. Martin', 'A Dream of Spring'),
(7, 'J. R. R. Tolkien', 'The Fellowship of the Ring'),
(8, 'George R. R. Martin', 'The Ice Dragon'),
(9, 'Mario Puzo', 'The Godfather');
SELECT *
FROM publication_events
WHERE author = 'George R. R. Martin';
The query output should look like this: