Build customer loyalty programs

Edit this page
Customer loyalty programs are everywhere in retail, even if it's as simple as 'Get 10 stamps for a free coffee.' However, in order to create a more sophisticated rewards program that engages customers at the right place and time, multiple data streams need to be aggregated to properly apply the right promotions. This tutorial showcases how a coffee shop has implemented three separate promotions at the same time:
  • A simple 'the more you buy, the bigger your discount' benefit
  • An online version of 'Buy N, get 1 free' recurring reward
  • A customizable program that looks at individual customer behavior and offers tailored personalized rewards

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

In this solution, we’ll look at different ways to analyze user behavior and determine which rewards we want to issue to our customers.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS loyalty_rewards WITH (
  'connector.class'          = 'PostgresSource',
  'name'                     = 'recipe-postgres-loyalty-rewards',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '<database-port>',
  'connection.user'          = '<database-user>',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'users, products, purchases',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM users (
  user_id VARCHAR KEY,
  name VARCHAR
) WITH (
  KAFKA_TOPIC = 'USERS',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE STREAM products (
  product_id VARCHAR KEY,
  category VARCHAR,
  price DECIMAL(10,2)
) WITH (
  KAFKA_TOPIC = 'products',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE STREAM purchases (
  user_id VARCHAR KEY,
  product_id VARCHAR
) WITH (
  KAFKA_TOPIC = 'purchases',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Summarize products.
CREATE TABLE all_products AS
  SELECT
    product_id,
    LATEST_BY_OFFSET(category) AS category,
    LATEST_BY_OFFSET(CAST(price AS DOUBLE)) AS price
  FROM products
  GROUP BY product_id;

-- Enrich purchases.
CREATE STREAM enriched_purchases AS
  SELECT
    purchases.user_id,
    purchases.product_id AS product_id,
    all_products.category,
    all_products.price
  FROM purchases
    LEFT JOIN all_products ON purchases.product_id = all_products.product_id;

CREATE TABLE sales_totals AS
  SELECT
    user_id,
    SUM(price) AS total,
    CASE
      WHEN SUM(price) > 400 THEN 'GOLD'
      WHEN SUM(price) > 300 THEN 'SILVER'
      WHEN SUM(price) > 200 THEN 'BRONZE'
      ELSE 'CLIMBING'
    END AS reward_level
  FROM enriched_purchases
  GROUP BY user_id;

CREATE TABLE caffeine_index AS
  SELECT
    user_id,
    COUNT(*) AS total,
    (COUNT(*) % 6) AS sequence,
    (COUNT(*) % 6) = 5 AS next_one_free
  FROM purchases
  WHERE product_id = 'coffee'
  GROUP BY user_id;

CREATE TABLE promotion_french_poodle
  AS
  SELECT
      user_id,
      collect_set(product_id) AS products,
      'french_poodle' AS promotion_name
  FROM purchases
  WHERE product_id IN ('dog', 'beret')
  GROUP BY user_id
  HAVING ARRAY_CONTAINS( collect_set(product_id), 'dog' )
  AND ARRAY_CONTAINS( collect_set(product_id), 'beret' )
  EMIT changes;

CREATE TABLE promotion_loose_leaf AS
  SELECT
      user_id,
      collect_set(product_id) AS products,
      'loose_leaf' AS promotion_name
  FROM enriched_purchases
  WHERE product_id IN ('coffee', 'tea')
  GROUP BY user_id
  HAVING ARRAY_CONTAINS( collect_set(product_id), 'coffee' )
  AND NOT ARRAY_CONTAINS( collect_set(product_id), 'tea' )
  AND sum(price) > 20;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

-- Some users.
INSERT INTO users ( user_id, name ) VALUES ( 'u2001', 'kris' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2002', 'dave' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2003', 'yeva' );
INSERT INTO users ( user_id, name ) VALUES ( 'u2004', 'rick' );

-- Some products.
INSERT INTO products ( product_id, category, price ) VALUES ( 'tea', 'beverages', 2.55 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'coffee', 'beverages', 2.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'dog', 'pets', 249.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'cat', 'pets', 195.00 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'beret', 'fashion', 34.99 );
INSERT INTO products ( product_id, category, price ) VALUES ( 'handbag', 'fashion', 126.00 );

-- Some purchases.
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'beret' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'cat' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'tea' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'dave', 'dog' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'dave', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'beret' );

-- A price increase!
INSERT INTO products ( product_id, category, price ) VALUES ( 'coffee', 'beverages', 3.05 );

-- Some more purchases.
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'dog' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'rick', 'cat' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'coffee' );
INSERT INTO purchases ( user_id, product_id ) VALUES ( 'yeva', 'handbag' );

To validate that this recipe is working, run the following query:

SELECT * FROM promotion_loose_leaf;

Your output should resemble:

+----------------------------------------------+----------------------------------------------+----------------------------------------------+
|USER_ID                                       |PRODUCTS                                      |PROMOTION_NAME                                |
+----------------------------------------------+----------------------------------------------+----------------------------------------------+
|kris                                          |[coffee]                                      |loose_leaf                                    |
Query terminated

Explanation

4

The more you buy, the bigger your discount

To start with the simplest reward scheme, let’s group our customers by how much they spend. We’ll say anyone who spends over $400 is a Gold customer, over $300 is Silver, and $200 is Bronze. Anyone else is still climbing that reward ladder.

This query creates a simple "total by user" summary table, adding in a extra column that groups the user’s total into price bands:

CREATE TABLE sales_totals AS
  SELECT
    user_id,
    SUM(price) AS total,
    CASE
      WHEN SUM(price) > 400 THEN 'GOLD'
      WHEN SUM(price) > 300 THEN 'SILVER'
      WHEN SUM(price) > 200 THEN 'BRONZE'
      ELSE 'CLIMBING'
    END AS reward_level
  FROM enriched_purchases
  GROUP BY user_id;

Querying from that table, we get the following:

SET 'ksql.query.pull.table.scan.enabled' = 'true';
SELECT * FROM sales_totals;

Here is the result:

+--------+-------+-------------+
|USER_ID |TOTAL  |REWARD_LEVEL |
+--------+-------+-------------+
|dave    |252.99 |BRONZE       |
|rick    |453.64 |GOLD         |
|kris    |74.22  |CLIMBING     |
|yeva    |368.07 |SILVER       |

Kris will never get any rewards at that rate! Let’s buy him a dog:

INSERT INTO purchases ( user_id, product_id ) VALUES ( 'kris', 'dog' );

Repeating that same query, Fido has pushed Kris into the Silver rewards scheme:

SELECT * FROM sales_totals;

Here is the result:

+--------+-------+-------------+
|USER_ID |TOTAL  |REWARD_LEVEL |
+--------+-------+-------------+
|dave    |252.99 |BRONZE       |
|rick    |453.64 |GOLD         |
|kris    |324.21 |SILVER       |
|yeva    |368.07 |SILVER       |

So we have campaign one—a table of user reward levels, which updates automatically every time a user makes a purchase. That data will probably stream off to the user’s account settings page so they can see their reward levels in an app, and it will probably be read by the billing system to calculate a fixed discount. We could also turn that table back into a stream so that every time the reward level changes, the user gets an email. But for now, let’s move on to a more complex use case.

Buy five and the next one’s on us

The chances are high that you have a coffee stamp card in your wallet (or several dozen of them). To keep our test data small, we’ll say our customers only need to buy five coffees to get a free one. Whatever the number, the implementation of this scheme is straightforward. We count up the number of drinks that they’ve purchased. When that number gets to five, the next one is free, and as it hits that sixth (free) one, we reset to zero.

CREATE TABLE caffeine_index AS
  SELECT
    user_id,
    COUNT(*) as total,
    (COUNT(*) % 6) AS sequence,
    (COUNT(*) % 6) = 5 AS next_one_free
  FROM purchases
  WHERE product_id = 'coffee'
  GROUP BY user_id;

NOTE: If you’re a programmer, that modulo operator % is going to be familiar. If not, you can read the % 6 bit as the "remainder after dividing by 6."

Selecting from that table:

SELECT *
FROM caffeine_index;
+--------+------+---------+--------------+
|USER_ID |TOTAL |SEQUENCE |NEXT_ONE_FREE |
+--------+------+---------+--------------+
|dave    |1     |1        |false         |
|rick    |2     |2        |false         |
|kris    |13    |1        |false         |
|yeva    |5     |5        |true          |

NOTE: The total and sequence columns aren’t strictly needed, but they help to show what’s going on.

Again, this updates in real time, so as customers complete a transaction to get their free coffee, the flag will flip back to false automatically.

Custom campaigns, tailored treats

To finish up, let’s think about some bespoke marketing campaigns—one to reward certain purchasing habits among our customers and another to encourage them to try new things.

As Acting Vice President In Charge Of Marketing, I have decided that anyone who has bought a dog and a beret is going to get a discount on Poodles. To figure out who this applies to, let’s scan through the purchases stream, narrow it down to the products that we’re interested in, and collect those products in a set:

SELECT
    user_id,
    collect_set(product_id) AS products
FROM purchases
WHERE product_id IN ('dog', 'beret')
GROUP BY user_id
EMIT CHANGES;

Here is the result:

+-------------------------------------+-------------------------------------+
|USER_ID                              |PRODUCTS                             |
+-------------------------------------+-------------------------------------+
|yeva                                 |[beret]                              |
|kris                                 |[beret]                              |
|dave                                 |[dog]                                |
|rick                                 |[dog]                                |
|kris                                 |[beret, dog]                         |

That looks about right. Now we just turn that into a table, which only shows rows that are HAVING both products in their purchase set:

CREATE TABLE promotion_french_poodle
  AS
  SELECT
      user_id,
      collect_set(product_id) AS products,
      'french_poodle' AS promotion_name
  FROM purchases
  WHERE product_id IN ('dog', 'beret')
  GROUP BY user_id
  HAVING ARRAY_CONTAINS( collect_set(product_id), 'dog' )
  AND ARRAY_CONTAINS( collect_set(product_id), 'beret' )
  EMIT changes;

Querying that looks like this:

SELECT * FROM promotion_french_poodle;
+------------------------+------------------------+------------------------+
|USER_ID                 |PRODUCTS                |PROMOTION_NAME          |
+------------------------+------------------------+------------------------+
|kris                    |[beret, dog]            |french_poodle           |

NOTE: It doesn’t matter which order they bought the items in or if they bought more than one. We’ll get the same result.

Lastly, let’s try something with a similar query but a very different business angle. We’d like to find all the customers who drink coffee but have never tried tea. Maybe a discount voucher would encourage them to give it a taste?

Let’s create a table that scans the purchase stream, picks out coffee and tea, and finds the users HAVING bought coffee AND NOT tea. To keep it meaningful, we’ll also limit it to customers who’ve spent at least $20 with us.

CREATE TABLE promotion_loose_leaf AS
  SELECT
      user_id,
      collect_set(product_id) AS products,
      'loose_leaf' AS promotion_name
  FROM enriched_purchases
  WHERE product_id IN ('coffee', 'tea')
  GROUP BY user_id
  HAVING ARRAY_CONTAINS( collect_set(product_id), 'coffee' )
  AND NOT ARRAY_CONTAINS( collect_set(product_id), 'tea' )
  AND sum(price) > 20;

Querying that table looks like this:

SELECT * FROM promotion_loose_leaf;
+------------------------+------------------------+------------------------+
|USER_ID                 |PRODUCTS                |PROMOTION_NAME          |
+------------------------+------------------------+------------------------+
|kris                    |[coffee]                |loose_leaf              |

That’s enough campaigning for one day. I think it’s time for a tea break…​

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;