Assess marketing promotional campaign efficacy

Edit this page
Oftentimes retailers will run promotions to drive sales and move inventory. However, if retailers are looking to build complex promotional strategies that involve multiple discounts using different coupon codes, it can be difficult to track the overall effectiveness of the marketing program. This recipe shows how ksqlDB can help track the performance of a multi-discount code promotion by understanding the average order value organized by discount percentage and the number of items purchased.

To see this recipe in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

In the case of this tutorial, we’re interested in capturing data that reflects incoming orders as well as details on unique discount codes. Kafka Connect can easily stream in data from a database containing that information; you can use the following template as a guide to setting up a connector. Through a series of ksqlDB statements, we’ll enrich our order data and compute some simple statistics based on that stream of enriched order data. By the end, we’ll have more insights as to just how well our discount code promotion is doing. Specifically, we’ll know the average order value per discount percentage as well as the average number of items purchased.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS discount_codes WITH (
  'connector.class'          = 'PostgresSource',
  'name'                     = 'recipe-postgres-discounting-codes',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'discount_codes',
  'timestamp.column.name'    = 'timestamp',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

CREATE SOURCE CONNECTOR IF NOT EXISTS orders WITH (
  'connector.class'          = 'PostgresSource',
  'name'                     = 'recipe-postgres-discounting-orders',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'order_data',
  'timestamp.column.name'    = 'order_time',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create Discount Code Table
CREATE TABLE DISCOUNT_CODES (
    CODE VARCHAR PRIMARY KEY,
    PERCENTAGE DOUBLE
) WITH (
    KAFKA_TOPIC = 'discount_codes',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);

-- Create Order Stream
CREATE STREAM ORDER_STREAM (
    ID BIGINT,
    ORDER_TIME TIMESTAMP,
    CUSTOMER_ID BIGINT,
    ITEM_CT INT,
    ORDER_SUBTOTAL DOUBLE,
    DISCOUNT_CODE VARCHAR
) WITH (
    KAFKA_TOPIC = 'order_data',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);


-- Application logic
CREATE TABLE DISCOUNT_ANALYSIS WITH (
    KAFKA_TOPIC = 'discount_analysis',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
) AS
SELECT
    DC.PERCENTAGE,
    ROUND(AVG(OS.ITEM_CT), 0) AS AVG_ITEM_CT,
    ROUND(AVG(OS.ORDER_SUBTOTAL), 2) AS AVG_ORDER_SUBTOTAL,
    ROUND(AVG(OS.ORDER_SUBTOTAL * (100 - DC.PERCENTAGE) / 100.0), 2) AS AVG_ORDER_TOTAL,
    ROUND(AVG(OS.ORDER_SUBTOTAL * (DC.PERCENTAGE / 100.0)), 2) AS AVG_DISCOUNT_TOTAL
FROM ORDER_STREAM AS OS
INNER JOIN DISCOUNT_CODES AS DC
ON OS.DISCOUNT_CODE = DC.CODE
GROUP BY DC.PERCENTAGE
EMIT CHANGES;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

-- Discount Codes
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('ALDNE15', 15.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('LWNDB15', 15.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('QQNSH15', 15.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('ENDJW15', 15.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('MWJDS15', 15.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('WVESJ20', 20.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('VNNSW20', 20.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('XSNPT20', 20.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('JDNTY25', 25.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('KIDNT25', 25.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('PEHTN25', 25.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('BSLWU30', 30.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('URITN40', 40.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('LSEMJ50', 50.0);
INSERT INTO discount_codes (CODE, PERCENTAGE) VALUES ('FKENT50', 50.0);

-- Orders
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1000, FROM_UNIXTIME(UNIX_TIMESTAMP()), 101,  3, 201.17, 'LSEMJ50');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1001, FROM_UNIXTIME(UNIX_TIMESTAMP()), 107,  8, 121.82, 'KIDNT25');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1002, FROM_UNIXTIME(UNIX_TIMESTAMP()), 111,  2,  65.25, 'LWNDB15');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1003, FROM_UNIXTIME(UNIX_TIMESTAMP()), 104, 11, 163.99, 'URITN40');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1004, FROM_UNIXTIME(UNIX_TIMESTAMP()), 102,  1,  47.16, 'ALDNE15');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1005, FROM_UNIXTIME(UNIX_TIMESTAMP()), 106,  5,  75.78, 'WVESJ20');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1006, FROM_UNIXTIME(UNIX_TIMESTAMP()), 105,  9, 118.27, 'PEHTN25');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1007, FROM_UNIXTIME(UNIX_TIMESTAMP()), 110,  5,  70.19, 'QQNSH15');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1008, FROM_UNIXTIME(UNIX_TIMESTAMP()), 108,  2,  83.45, 'MWJDS15');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1009, FROM_UNIXTIME(UNIX_TIMESTAMP()), 109,  8, 130.64, 'BSLWU30');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1010, FROM_UNIXTIME(UNIX_TIMESTAMP()), 103,  6, 105.71, 'VNNSW20');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1011, FROM_UNIXTIME(UNIX_TIMESTAMP()), 113, 10, 199.85, 'FKENT50');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1012, FROM_UNIXTIME(UNIX_TIMESTAMP()), 112,  8,  99.13, 'ENDJW15');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1013, FROM_UNIXTIME(UNIX_TIMESTAMP()), 117,  4,  85.80, 'XSNPT20');
INSERT INTO order_stream (ID, ORDER_TIME, CUSTOMER_ID, ITEM_CT, ORDER_SUBTOTAL, DISCOUNT_CODE) VALUES (1014, FROM_UNIXTIME(UNIX_TIMESTAMP()), 120,  9, 120.91, 'JDNTY25');

Explanation

4

You may have noticed that the application logic query really consists of two stages wrapped into one: an enrichment stage and an aggregation stage. This was done for efficiency and convenience. But, as an added exercise, let’s break these down into separate stages and see what’s happening.

First: Enrich the orders stream with discount details. Compute the discount amount as well as order total.

CREATE STREAM ORDERS_ENRICHED WITH (
    KAFKA_TOPIC = 'orders_enriched',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
) AS
SELECT
    OS.ID AS ORDER_ID,
    OS.DISCOUNT_CODE,
    DISCOUNT_CODES.PERCENTAGE,
    OS.ITEM_CT,
    OS.ORDER_SUBTOTAL,
    OS.ORDER_SUBTOTAL * (100 - DISCOUNT_CODES.PERCENTAGE) / 100.0 AS ORDER_TOTAL,
    OS.ORDER_SUBTOTAL * (DISCOUNT_CODES.PERCENTAGE / 100.0) AS DISCOUNT_TOTAL
FROM ORDER_STREAM AS OS
INNER JOIN DISCOUNT_CODES
ON OS.DISCOUNT_CODE = DISCOUNT_CODES.CODE
EMIT CHANGES;

Second: Compute aggregates based on discount percentage.

CREATE TABLE DISCOUNT_ANALYSIS WITH (
    KAFKA_TOPIC = 'discount_analysis',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
) AS
SELECT
    PERCENTAGE,
    ROUND(AVG(ITEM_CT), 0) AS AVG_ITEM_CT,
    ROUND(AVG(ORDER_SUBTOTAL), 2) AS AVG_ORDER_SUBTOTAL,
    ROUND(AVG(ORDER_TOTAL), 2) AS AVG_ORDER_TOTAL,
    ROUND(AVG(DISCOUNT_TOTAL), 2) AS AVG_DISCOUNT_TOTAL
FROM ORDERS_ENRICHED
GROUP BY PERCENTAGE
EMIT CHANGES;

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;