Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
This tutorial creates simulated data with the Datagen
connector.
Then you can process the data in a variety of ways by enriching the clickstream data with user information, analyze errors, aggregate data into windows of time, etc.
Optional: To simulate a real-world scenario where user sessions aren’t just always open but do close after some time, you can pause and resume the DATAGEN_CLICKSTREAM
connector.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
-- Stream of users
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_datagen_users WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'clickstream_users',
'quickstart' = 'CLICKSTREAM_USERS',
'maxInterval' = '10',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);
-- Stream of per-user session information
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_datagen_clicks WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'clickstream',
'quickstart' = 'CLICKSTREAM',
'maxInterval' = '30',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);
SET 'auto.offset.reset' = 'earliest';
-- stream of user clicks:
CREATE STREAM clickstream (
_time BIGINT,
time VARCHAR,
ip VARCHAR,
request VARCHAR,
status INT,
userid INT,
bytes BIGINT,
agent VARCHAR
) WITH (
KAFKA_TOPIC = 'clickstream',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
);
-- users lookup table:
CREATE TABLE web_users (
user_id VARCHAR PRIMARY KEY,
registered_At BIGINT,
username VARCHAR,
first_name VARCHAR,
last_name VARCHAR,
city VARCHAR,
level VARCHAR
) WITH (
KAFKA_TOPIC = 'clickstream_users',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
);
-- Build materialized stream views:
-- enrich click-stream with more user information:
CREATE STREAM user_clickstream AS
SELECT
u.user_id,
u.username,
ip,
u.city,
request,
status,
bytes
FROM clickstream c
LEFT JOIN web_users u ON cast(c.userid AS VARCHAR) = u.user_id;
-- Build materialized table views:
-- Table of html pages per minute for each user:
CREATE TABLE pages_per_min AS
SELECT
userid AS k1,
AS_VALUE(userid) AS userid,
WINDOWSTART AS EVENT_TS,
COUNT(*) AS pages
FROM clickstream WINDOW HOPPING (SIZE 60 SECOND, ADVANCE BY 5 SECOND)
WHERE request LIKE '%html%'
GROUP BY userid;
-- User sessions table - 30 seconds of inactivity expires the session
-- Table counts number of events within the session
CREATE TABLE click_user_sessions AS
SELECT
username AS K,
AS_VALUE(username) AS username,
WINDOWEND AS EVENT_TS,
COUNT(*) AS events
FROM user_clickstream WINDOW SESSION (30 SECOND)
GROUP BY username;
-- number of errors per min, using 'HAVING' Filter to show ERROR codes > 400
-- where count > 5
CREATE TABLE errors_per_min_alert WITH (KAFKA_TOPIC = 'errors_per_min_alert') AS
SELECT
status AS k1,
AS_VALUE(status) AS status,
WINDOWSTART AS EVENT_TS,
COUNT(*) AS errors
FROM clickstream WINDOW HOPPING (SIZE 60 SECOND, ADVANCE BY 20 SECOND)
WHERE status > 400
GROUP BY status
HAVING COUNT(*) > 5 AND COUNT(*) IS NOT NULL;
-- Enriched user details table:
-- Aggregate (count&groupBy) using a TABLE-Window
CREATE TABLE user_ip_activity WITH (KEY_FORMAT = 'JSON', KAFKA_TOPIC = 'user_ip_activity') AS
SELECT
username AS k1,
ip AS k2,
city AS k3,
AS_VALUE(username) AS username,
WINDOWSTART AS EVENT_TS,
AS_VALUE(ip) AS ip,
AS_VALUE(city) AS city,
COUNT(*) AS count
FROM user_clickstream WINDOW TUMBLING (SIZE 60 SECOND)
GROUP BY username, ip, city
HAVING COUNT(*) > 1;
After processing the data, send it to Elasticsearch.
-- Send data to Elasticsearch
CREATE SINK CONNECTOR IF NOT EXISTS recipe_elasticsearch_analyzed_clickstream WITH (
'connector.class' = 'ElasticsearchSink',
'input.data.format' = 'JSON',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'topics' = 'user_ip_activity, errors_per_min_alert',
'connection.url' = '<elasticsearch-URI>',
'connection.user' = '<elasticsearch-username>',
'connection.password' = '<elasticsearch-password>',
'type.name' = 'type.name=kafkaconnect',
'key.ignore' = 'true',
'schema.ignore' = 'true'
);
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;