Understand user behavior with clickstream data

Edit this page
Analyzing clickstream data enables businesses to optimize webpages and determine the effectiveness of their web presence by better understanding their users’ click activity and navigation patterns. Because clickstream data often involves large data volumes, stream processing is a natural fit, as it quickly processes data as soon as it is ingested for analysis. This tutorial enables you to measure key statistics on visitor activity over a given time frame, such as how many webpages they are viewing, how long they’re engaging with the website, and more.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

This tutorial creates simulated data with the Datagen connector. Then you can process the data in a variety of ways by enriching the clickstream data with user information, analyze errors, aggregate data into windows of time, etc.

Optional: To simulate a real-world scenario where user sessions aren’t just always open but do close after some time, you can pause and resume the DATAGEN_CLICKSTREAM connector.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Stream of users
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_datagen_users WITH (
  'connector.class'          = 'DatagenSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'clickstream_users',
  'quickstart'               = 'CLICKSTREAM_USERS',
  'maxInterval'              = '10',
  'tasks.max'                = '1',
  'output.data.format'       = 'JSON'
);

-- Stream of per-user session information
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_datagen_clicks WITH (
  'connector.class'          = 'DatagenSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'clickstream',
  'quickstart'               = 'CLICKSTREAM',
  'maxInterval'              = '30',
  'tasks.max'                = '1',
  'output.data.format'       = 'JSON'
);

SET 'auto.offset.reset' = 'earliest';

-- stream of user clicks:
CREATE STREAM clickstream (
  _time BIGINT,
  time VARCHAR,
  ip VARCHAR,
  request VARCHAR,
  status INT,
  userid INT,
  bytes BIGINT,
  agent VARCHAR
) WITH (
  KAFKA_TOPIC = 'clickstream',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
);

-- users lookup table:
CREATE TABLE web_users (
  user_id VARCHAR PRIMARY KEY,
  registered_At BIGINT,
  username VARCHAR,
  first_name VARCHAR,
  last_name VARCHAR,
  city VARCHAR,
  level VARCHAR
) WITH (
  KAFKA_TOPIC = 'clickstream_users',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
);

-- Build materialized stream views:

-- enrich click-stream with more user information:
CREATE STREAM user_clickstream AS
  SELECT
    u.user_id,
    u.username,
    ip,
    u.city,
    request,
    status,
    bytes
  FROM clickstream c
  LEFT JOIN web_users u ON cast(c.userid AS VARCHAR) = u.user_id;

-- Build materialized table views:

-- Table of html pages per minute for each user:
CREATE TABLE pages_per_min AS
  SELECT
    userid AS k1,
    AS_VALUE(userid) AS userid,
    WINDOWSTART AS EVENT_TS,
    COUNT(*) AS pages
  FROM clickstream WINDOW HOPPING (SIZE 60 SECOND, ADVANCE BY 5 SECOND)
  WHERE request LIKE '%html%'
  GROUP BY userid;

-- User sessions table - 30 seconds of inactivity expires the session
-- Table counts number of events within the session
CREATE TABLE click_user_sessions AS
  SELECT
    username AS K,
    AS_VALUE(username) AS username,
    WINDOWEND AS EVENT_TS,
    COUNT(*) AS events
  FROM user_clickstream WINDOW SESSION (30 SECOND)
  GROUP BY username;

-- number of errors per min, using 'HAVING' Filter to show ERROR codes > 400
-- where count > 5
CREATE TABLE errors_per_min_alert WITH (KAFKA_TOPIC = 'errors_per_min_alert') AS
  SELECT
    status AS k1,
    AS_VALUE(status) AS status,
    WINDOWSTART AS EVENT_TS,
    COUNT(*) AS errors
  FROM clickstream WINDOW HOPPING (SIZE 60 SECOND, ADVANCE BY 20 SECOND)
  WHERE status > 400
  GROUP BY status
  HAVING COUNT(*) > 5 AND COUNT(*) IS NOT NULL;

-- Enriched user details table:
-- Aggregate (count&groupBy) using a TABLE-Window
CREATE TABLE user_ip_activity WITH (KEY_FORMAT = 'JSON', KAFKA_TOPIC = 'user_ip_activity') AS
  SELECT
    username AS k1,
    ip AS k2,
    city AS k3,
    AS_VALUE(username) AS username,
    WINDOWSTART AS EVENT_TS,
    AS_VALUE(ip) AS ip,
    AS_VALUE(city) AS city,
    COUNT(*) AS count
  FROM user_clickstream WINDOW TUMBLING (SIZE 60 SECOND)
  GROUP BY username, ip, city
  HAVING COUNT(*) > 1;

Write the data out

3

After processing the data, send it to Elasticsearch.

-- Send data to Elasticsearch
CREATE SINK CONNECTOR IF NOT EXISTS recipe_elasticsearch_analyzed_clickstream WITH (
  'connector.class'          = 'ElasticsearchSink',
  'input.data.format'        = 'JSON',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'user_ip_activity, errors_per_min_alert',
  'connection.url'           = '<elasticsearch-URI>',
  'connection.user'          = '<elasticsearch-username>',
  'connection.password'      = '<elasticsearch-password>',
  'type.name'                = 'type.name=kafkaconnect',
  'key.ignore'               = 'true',
  'schema.ignore'            = 'true'
);

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;