Correlate customer behavior across in-store and online channels

Edit this page
Today's retail customers browse and purchase products across many channels — from brick-and-mortar, to web, to mobile applications — and they expect a seamless shopping experience bridging these channels. The omnichannel retail setting is often rife with technical challenges stemming from data residing in disparate data sources. For example, web clickstream, mobile application user activity, and in-store purchase data often reside in different data stores or SaaS platforms. This tutorial enables you to correlate customers' in-store purchase activity with online user behavior, which can then feed downstream omnichannel analytics or improved customer experience (e.g., more relevant product recommendations online based on customers' in-store activity).

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

This tutorial creates simulated data with the Datagen connector. Then you can process the data in a variety of ways by enriching the orders and clickstream data with product and customer attributes, analyze errors, aggregate data by time, etc.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Stream of products (shoes)
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOES WITH (
  'connector.class'          = 'DatagenSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'shoes',
  'quickstart'               = 'SHOES',
  'maxInterval'              = '10',
  'tasks.max'                = '1',
  'output.data.format'       = 'JSON'
);

-- Stream of customers
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOE_CUSTOMERS WITH (
  'connector.class'          = 'DatagenSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'shoe_customers',
  'quickstart'               = 'SHOE_CUSTOMERS',
  'maxInterval'              = '10',
  'tasks.max'                = '1',
  'output.data.format'       = 'JSON'
);

-- Stream of orders
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOE_ORDERS WITH (
  'connector.class'          = 'DatagenSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'shoe_orders',
  'quickstart'               = 'SHOE_ORDERS',
  'maxInterval'              = '10',
  'tasks.max'                = '1',
  'output.data.format'       = 'JSON'
);

-- Stream of ecommerce website clicks
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOE_CLICKSTREAM WITH (
  'connector.class'          = 'DatagenSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'shoe_clickstream',
  'quickstart'               = 'SHOE_CLICKSTREAM',
  'maxInterval'              = '30',
  'tasks.max'                = '1',
  'output.data.format'       = 'JSON'
);

SET 'auto.offset.reset' = 'earliest';

-- Create a stream of products (shoes)
CREATE STREAM shoes (
  id VARCHAR,
  brand VARCHAR,
  name VARCHAR,
  sale_price INT,
  rating DOUBLE,
  ts BIGINT
) WITH (
  KAFKA_TOPIC = 'shoes',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
);

-- Create a stream of customers
CREATE STREAM shoe_customers (
  id VARCHAR,
  first_name VARCHAR,
  last_name VARCHAR,
  email VARCHAR,
  phone VARCHAR,
  street_address VARCHAR,
  state VARCHAR,
  zip_code VARCHAR,
  country VARCHAR,
  country_code VARCHAR
) WITH (
  KAFKA_TOPIC = 'shoe_customers',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
);

-- Create a stream of orders
CREATE STREAM shoe_orders (
  order_id INT,
  product_id VARCHAR,
  customer_id VARCHAR,
  ts BIGINT
) WITH (
  KAFKA_TOPIC = 'shoe_orders',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1,
  TIMESTAMP = 'ts'
);

-- Create a stream of product website clicks
CREATE STREAM shoe_clickstream (
  product_id VARCHAR,
  user_id VARCHAR,
  view_time INT,
  page_url VARCHAR,
  ip VARCHAR,
  ts BIGINT
) WITH (
  KAFKA_TOPIC = 'shoe_clickstream',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1,
  TIMESTAMP = 'ts'
);

-- Create a stream of enriched orders
CREATE STREAM shoe_orders_enriched WITH (
  kafka_topic='shoe_orders_enriched',
  partitions=1,
  value_format='JSON'
) AS
SELECT * FROM shoe_orders
  INNER JOIN shoe_clickstream
    WITHIN 1 HOUR
    GRACE PERIOD 1 MINUTE
    ON shoe_orders.customer_id = shoe_clickstream.user_id
EMIT CHANGES;

Write the data out

3

After processing the data, send it to Elasticsearch.

-- Send data to Elasticsearch
CREATE SINK CONNECTOR IF NOT EXISTS analyzed_clickstream WITH (
  'connector.class'          = 'ElasticsearchSink',
  'name'                     = 'recipe-elasticsearch-analyzed_clickstream',
  'input.data.format'        = 'JSON',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'orders_enriched',
  'connection.url'           = '<elasticsearch-URI>',
  'connection.user'          = '<elasticsearch-username>',
  'connection.password'      = '<elasticsearch-password>',
  'type.name'                = 'type.name=kafkaconnect',
  'key.ignore'               = 'true',
  'schema.ignore'            = 'true'
);

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;