If you have time series events in a Kafka topic, session windows let you group and aggregate them into variable-size, non-overlapping time intervals based on a configurable inactivity period.
For example, suppose that you have a topic with events that represent website clicks and you'd like track user behavior on your site. Session windows are different from fixed-size windows (tumbling or hopping) because the start and end of the window is determined solely by the event timestamps, and they aren't fixed in size. As long as there is an event within a defined inactivity gap, the window will continue to grow.
Let's assume the following DDL for tracking user click activity:
CREATE TABLE clicks (
ip STRING,
url STRING,
click_ts TIMESTAMP(3),
-- declare ts as event time attribute and use strictly ascending timestamp watermark strategy
WATERMARK FOR click_ts AS click_ts
);
The timestamp is an important attribute since we’ll be modeling user behavior and how many pages they view in a given session. Also, because we are going to aggregate over time windows, we must define a watermark strategy. In this case, we use strictly ascending timestamps, i.e., any row with a timestamp that is less than or equal to the latest observed event timestamp is considered late and ignored.
Given the clicks table definition above, let’s figure out how many pages a user visits in a session. You'll define an inactivity gap of 2 minutes to mark the end of a session. Otherwise, as users continue to click within 5 minutes, the existing session will continue to grow. Let's observe user click behavior using a session windowed table-valued function (TVF).
SELECT url,
COUNT(url) AS visited_count,
window_start,
window_end
FROM TABLE(SESSION(TABLE clicks PARTITION BY url, DESCRIPTOR(click_ts), INTERVAL '2' MINUTES))
GROUP BY url, window_start, window_end;
You can run the example backing this tutorial in one of three ways: a Flink Table API-based JUnit test, locally with the Flink SQL Client against Flink and Kafka running in Docker, or with Confluent Cloud.
Clone the confluentinc/tutorials GitHub repository (if you haven't already) and navigate to the tutorials directory:
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
Run the following command to execute FlinkSqlMergeTablesTest#testMerge:
./gradlew clean :session-windows:flinksql:test
The test starts Kafka and Schema Registry with Testcontainers, runs the Flink SQL commands above against a local Flink StreamExecutionEnvironment, and ensures that the routed results are what we expect.
Clone the confluentinc/tutorials GitHub repository (if you haven't already) and navigate to the tutorials directory:
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
Start Flink and Kafka:
docker compose -f ./docker/docker-compose-flinksql.yml up -d
Next, open the Flink SQL Client CLI:
docker exec -it flink-sql-client sql-client.sh
Finally, run following SQL statements to create the clicks table backed by Kafka running in Docker, populate them with test data, and then run a statement displaying the session windows.
CREATE TABLE clicks (
ip STRING,
url STRING,
click_ts TIMESTAMP(3),
WATERMARK FOR click_ts AS click_ts
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'connector' = 'kafka',
'properties.bootstrap.servers' = 'broker:9092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'ip',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
);
INSERT INTO clicks VALUES
('9.62.201.241','/acme/jeep-stuff/', TO_TIMESTAMP('2023-07-09 01:00:00')),
('122.65.213.141', '/farm-for-all/chickens/', TO_TIMESTAMP('2023-07-09 02:00:10')),
('122.65.213.141', '/farm-for-all/chickens/', TO_TIMESTAMP('2023-07-09 02:00:20')),
('122.65.213.141', '/farm-for-all/chickens/', TO_TIMESTAMP('2023-07-09 02:01:00')),
('9.62.201.241', '/acme/jeep-stuff/', TO_TIMESTAMP('2023-07-09 01:00:30')),
('9.62.201.241', '/acme/jeep-stuff/', TO_TIMESTAMP('2023-07-09 01:01:00')),
('21.229.87.11', '/amc-rio/movies/', TO_TIMESTAMP('2023-07-09 09:00:00')),
('234.112.107.50', '/trips/packages/', TO_TIMESTAMP('2023-07-09 12:00:00')),
('21.229.87.11', '/amc-rio/movies/', TO_TIMESTAMP('2023-07-09 09:00:30')),
('122.65.213.141', '/farm-for-all/tractors/', TO_TIMESTAMP('2023-07-09 02:30:00')),
('122.65.213.141', '/farm-for-all/tractors/', TO_TIMESTAMP('2023-07-10 02:31:00'));
SELECT url,
COUNT(url) AS visited_count,
window_start,
window_end
FROM TABLE(SESSION(TABLE clicks PARTITION BY url, DESCRIPTOR(click_ts), INTERVAL '2' MINUTES))
GROUP BY url, window_start, window_end;
The query output should look like this:
url visited_count window_start window_end
/acme/jeep-stuff/ 3 2023-07-09 01:00:00.000 2023-07-09 01:03:00.000
/farm-for-all/chickens/ 3 2023-07-09 02:00:10.000 2023-07-09 02:03:00.000
/farm-for-all/tractors/ 1 2023-07-09 02:30:00.000 2023-07-09 02:32:00.000
/amc-rio/movies/ 2 2023-07-09 09:00:00.000 2023-07-09 09:02:30.000
/trips/packages/ 1 2023-07-09 12:00:00.000 2023-07-09 12:02:00.000
When you are finished, clean up the containers used for this tutorial by running:
docker compose -f ./docker/docker-compose-flinksql.yml down