Get a full view of a customer’s online journey

Edit this page
To unlock improved conversion rates and personalized experiences, many companies often want to know which web pages visitors have browsed and engaged with. Analyzing online behavior can be useful to better understand customer behavior and needs, improve each webpage’s relevancy, and tailor customer support interactions. This recipe showcases how to run real-time analyses of the customer journey by leveraging ksqlDB to collect web pages visited and send the list out for downstream analytics.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

Given a stream of pageview statistics, count the number of visits to individual pages, grouped by customer. This uses the ksqlDB function COUNT DISTINCT.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS inventory WITH (
  'connector.class'          = 'PostgresSource',
  'name'                     = 'recipe-postgres-customer_journey',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'pages',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create stream of pages
CREATE STREAM pages (
  customer INTEGER,
  time BIGINT,
  page_id VARCHAR,
  page VARCHAR
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'pages',
  PARTITIONS = 6
);

-- Create stateful table with Array of pages visited by each customer, using the `COLLECT_LIST` function
-- Get `COUNT_DISTINCT` page IDs
CREATE TABLE pages_per_customer WITH (KAFKA_TOPIC = 'pages_per_customer') AS
SELECT
  customer,
  COLLECT_LIST(page) AS page_list,
  COUNT_DISTINCT (page_id) AS count_distinct
FROM pages
GROUP BY customer
EMIT CHANGES;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO pages (customer, time, page_id, page) VALUES (44, UNIX_TIMESTAMP(), '1001', '/market/page1.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (89, UNIX_TIMESTAMP(), '1001', '/market/page1.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (44, UNIX_TIMESTAMP(), '1020', '/market/page20.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (89, UNIX_TIMESTAMP(), '1002', '/market/page2.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (89, UNIX_TIMESTAMP(), '1037', '/market/page37.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (44, UNIX_TIMESTAMP(), '1600', '/cart/checkout.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (89, UNIX_TIMESTAMP(), '1600', '/cart/checkout.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (44, UNIX_TIMESTAMP(), '1020', '/market/page20.html');
INSERT INTO pages (customer, time, page_id, page) VALUES (89, UNIX_TIMESTAMP(), '1002', '/market/page2.html');

To validate that this recipe is working, run the following query:

SELECT * FROM pages_per_customer EMIT CHANGES LIMIT 6;

Your output should resemble:

+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|CUSTOMER                                                         |PAGE_LIST                                                        |COUNT_DISTINCT                                                   |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|44                                                               |[/market/page1.html]                                             |1                                                                |
|89                                                               |[/market/page1.html]                                             |1                                                                |
|44                                                               |[/market/page1.html, /market/page20.html]                        |2                                                                |
|89                                                               |[/market/page1.html, /market/page2.html]                         |2                                                                |
|89                                                               |[/market/page1.html, /market/page2.html, /market/page37.html]    |3                                                                |
|44                                                               |[/market/page1.html, /market/page20.html, /cart/checkout.html]   |3                                                                |
Limit Reached
Query terminated

Write the data out

4

After processing the data, send it to Elasticsearch.

-- Send data to Elasticsearch
CREATE SINK CONNECTOR IF NOT EXISTS analyzed_clickstream WITH (
  'connector.class'          = 'ElasticsearchSink',
  'name'                     = 'recipe-elasticsearch-customer_journey',
  'input.data.format'        = 'JSON',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'pages_per_customer, ERRORS_PER_MIN_ALERT',
  'connection.url'           = '<elasticsearch-URI>',
  'connection.user'          = '<elasticsearch-username>',
  'connection.password'      = '<elasticsearch-password>',
  'type.name'                = 'type.name=kafkaconnect',
  'key.ignore'               = 'true',
  'schema.ignore'            = 'true'
);

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;