Create personalized banking promotions

Edit this page
Consumers often face never-ending generic marketing messages not tailored to their needs, resulting in poor customer conversion rates. A better approach is known as 'Next Best Offer,' which leverages predictive analytics to analyze a customer’s spending habits and activities to create more targeted promotions. This recipe demonstrates how ksqlDB can take customer banking information to create a predictive analytics model and improve customer conversions through personalized marketing efforts.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

This application will perform a series of joins between event streams and tables to calculate the next best offer for a banking customer based on their activity which should yield higher customer activity and satisfaction.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_next_best_offer WITH (
  'connector.class'          = 'PostgresSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'customer-activity',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset'='earliest';

CREATE TABLE customers (
    CUSTOMER_ID INTEGER PRIMARY KEY,
    FIRST_NAME VARCHAR,
    LAST_NAME VARCHAR,
    EMAIL VARCHAR,
    GENDER VARCHAR,
    INCOME INTEGER,
    FICO INTEGER
) WITH (
    KAFKA_TOPIC = 'CUSTOMERS_TABLE',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);

CREATE TABLE offers (
    OFFER_ID INTEGER PRIMARY KEY,
    OFFER_NAME VARCHAR,
    OFFER_URL VARCHAR
) WITH (
    KAFKA_TOPIC = 'OFFERS_STREAM',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);

CREATE STREAM customer_activity_stream (
    CUSTOMER_ID INTEGER KEY,
    ACTIVITY_ID INTEGER,
    IP_ADDRESS VARCHAR,
    ACTIVITY_TYPE VARCHAR,
    PROPENSITY_TO_BUY DOUBLE
   ) WITH (
    KAFKA_TOPIC = 'CUSTOMER_ACTIVITY_STREAM',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);

-- Application logic
CREATE STREAM next_best_offer
WITH (
    KAFKA_TOPIC = 'NEXT_BEST_OFFER',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
) AS
SELECT
    cask.CUSTOMER_ID as CUSTOMER_ID,
    cask.ACTIVITY_ID,
    cask.PROPENSITY_TO_BUY,
    cask.ACTIVITY_TYPE,
    ct.INCOME,
    ct.FICO,
    CASE
        WHEN ct.INCOME > 100000 AND ct.FICO < 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 1
        WHEN ct.INCOME < 50000 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 2
        WHEN ct.INCOME >= 50000 AND ct.FICO >= 600 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 3
        WHEN ct.INCOME > 100000 AND ct.FICO >= 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 4
        ELSE 5
    END AS OFFER_ID
FROM customer_activity_stream cask
INNER JOIN customers ct ON cask.CUSTOMER_ID = ct.CUSTOMER_ID;

CREATE STREAM next_best_offer_lookup
WITH (
    KAFKA_TOPIC = 'NEXT_BEST_OFFER_LOOKUP',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
) AS
SELECT
    nbo.CUSTOMER_ID,
    nbo.ACTIVITY_ID,
    nbo.OFFER_ID,
    nbo.PROPENSITY_TO_BUY,
    nbo.ACTIVITY_TYPE,
    nbo.INCOME,
    nbo.FICO,
    ot.OFFER_NAME,
    ot.OFFER_URL
FROM next_best_offer nbo
INNER JOIN offers ot
ON nbo.OFFER_ID = ot.OFFER_ID;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES  (1,'Waylen','Tubble','wtubble0@hc360.com','Male',403646, 465);
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES  (2,'Joell','Wilshin','jwilshin1@yellowpages.com','Female',109825, 705);
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES  (3,'Ilaire','Latus','ilatus2@baidu.com','Male',407964, 750);

INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (1,'new_savings','http://google.com.br/magnis/dis/parturient.json');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (2,'new_checking','https://earthlink.net/in/ante.js');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (3,'new_home_loan','https://webs.com/in/ante.jpg');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (4,'new_auto_loan','http://squidoo.com/venenatis/non/sodales/sed/tincidunt/eu.js');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (5,'no_offer','https://ezinearticles.com/ipsum/primis/in/faucibus/orci/luctus.html');

INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 1,'121.219.110.170','branch_visit',0.4);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 2,'210.232.55.188','deposit',0.56);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 3,'84.197.123.173','web_open',0.33);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 4,'70.149.233.32','deposit',0.41);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 5,'221.234.209.67','deposit',0.44);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 6,'102.187.28.148','web_open',0.33);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 7,'135.37.250.250','mobile_open',0.97);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 8,'122.157.243.25','deposit',0.83);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 9,'114.215.212.181','deposit',0.86);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 10,'248.248.0.78','new_account',0.14);

To validate that this recipe is working, run the following query:

SELECT * FROM next_best_offer_lookup;

Your output should resemble:

+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|NBO_OFFER_ID      |CUSTOMER_ID       |ACTIVITY_ID       |PROPENSITY_TO_BUY |ACTIVITY_TYPE     |INCOME            |FICO              |OFFER_NAME        |OFFER_URL         |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|1                 |1                 |1                 |0.4               |branch_visit      |403646            |465               |new_savings       |http://google.com.|
|                  |                  |                  |                  |                  |                  |                  |                  |br/magnis/dis/part|
|                  |                  |                  |                  |                  |                  |                  |                  |urient.json       |
|3                 |3                 |3                 |0.33              |web_open          |407964            |750               |new_home_loan     |https://webs.com/i|
|                  |                  |                  |                  |                  |                  |                  |                  |n/ante.jpg        |
|5                 |1                 |7                 |0.97              |mobile_open       |403646            |465               |no_offer          |https://ezineartic|
|                  |                  |                  |                  |                  |                  |                  |                  |les.com/ipsum/prim|
|                  |                  |                  |                  |                  |                  |                  |                  |is/in/faucibus/orc|
|                  |                  |                  |                  |                  |                  |                  |                  |i/luctus.html     |
|1                 |1                 |4                 |0.41              |deposit           |403646            |465               |new_savings       |http://google.com.|
|                  |                  |                  |                  |                  |                  |                  |                  |br/magnis/dis/part|
|                  |                  |                  |                  |                  |                  |                  |                  |urient.json       |
|1                 |1                 |10                |0.14              |new_account       |403646            |465               |new_savings       |http://google.com.|
|                  |                  |                  |                  |                  |                  |                  |                  |br/magnis/dis/part|
|                  |                  |                  |                  |                  |                  |                  |                  |urient.json       |
|3                 |2                 |2                 |0.56              |deposit           |109825            |705               |new_home_loan     |https://webs.com/i|
|                  |                  |                  |                  |                  |                  |                  |                  |n/ante.jpg        |
|3                 |3                 |6                 |0.33              |web_open          |407964            |750               |new_home_loan     |https://webs.com/i|
|                  |                  |                  |                  |                  |                  |                  |                  |n/ante.jpg        |
|3                 |2                 |5                 |0.44              |deposit           |109825            |705               |new_home_loan     |https://webs.com/i|
|                  |                  |                  |                  |                  |                  |                  |                  |n/ante.jpg        |
|3                 |3                 |9                 |0.86              |deposit           |407964            |750               |new_home_loan     |https://webs.com/i|
|                  |                  |                  |                  |                  |                  |                  |                  |n/ante.jpg        |
|3                 |2                 |8                 |0.83              |deposit           |109825            |705               |new_home_loan     |https://webs.com/i|
|                  |                  |                  |                  |                  |                  |                  |                  |n/ante.jpg        |
Query terminated

Explanation

4

Adding the lookup tables

In the event stream you’ll use for this tutorial, each activity entry contains only the id for the customer. This is expected, as it’s a common practice to have normalized event streams. But when it’s time to analyze the data, it’s important to have additional customer information to provide context for any analysts reviewing the results. You’ll also need a table to use for the calculated offer.

Creating the customer table and inserting records

Create the table for customer information:

CREATE TABLE customers (
    CUSTOMER_ID INTEGER PRIMARY KEY,
    FIRST_NAME VARCHAR,
    LAST_NAME VARCHAR,
    EMAIL VARCHAR,
    GENDER VARCHAR,
    INCOME INTEGER,
    FICO INTEGER
) WITH (
    KAFKA_TOPIC = 'CUSTOMERS_TABLE',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);

Typically, customer information would be sourced from an existing database. As customer details change, tables in the database are updated and we can stream them into Kafka using Kafka Connect with change data capture (CDC). The primary key for the customers table is the customer id, which corresponds to the key of the customer_activity_stream. This stream facilitates joins for enriching customer information. In this example, we’ll populate the customers table by executing the following INSERT statements:

INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES  (1,'Waylen','Tubble','wtubble0@hc360.com','Male',403646, 465);
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES  (2,'Joell','Wilshin','jwilshin1@yellowpages.com','Female',109825, 624);
INSERT INTO customers (customer_id, first_name, last_name, email, gender, income, fico) VALUES  (3,'Ilaire','Latus','ilatus2@baidu.com','Male',407964, 683);

Creating the offer table

You’ll also need a lookup table for the OFFERS data:

CREATE TABLE offers (
    OFFER_ID INTEGER PRIMARY KEY,
    OFFER_NAME VARCHAR,
    OFFER_URL VARCHAR
) WITH (
    KAFKA_TOPIC = 'OFFERS_STREAM',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);

This table provides the enrichment information that will be needed when the application calculates the next best offer.

To populate the OFFERS table, execute the following INSERT statements:

INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (1,'new_savings','http://google.com.br/magnis/dis/parturient.json');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (2,'new_checking','https://earthlink.net/in/ante.js');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (3,'new_home_loan','https://webs.com/in/ante.jpg');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (4,'new_auto_loan','http://squidoo.com/venenatis/non/sodales/sed/tincidunt/eu.js');
INSERT INTO offers (offer_id, offer_name, offer_url) VALUES (5,'no_offer','https://ezinearticles.com/ipsum/primis/in/faucibus/orci/luctus.html');

Creating the event stream

Now you’ll create the stream that contains the customer activity:

CREATE STREAM customer_activity_stream (
    CUSTOMER_ID INTEGER KEY,
    ACTIVITY_ID INTEGER,
    IP_ADDRESS VARCHAR,
    ACTIVITY_TYPE VARCHAR,
    PROPENSITY_TO_BUY DOUBLE
   ) WITH (
    KAFKA_TOPIC = 'CUSTOMER_ACTIVITY_STREAM',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
);

In a production setting you’ll populate the stream’s underlying topic either with KafkaProducer application or from an external system using a managed connector on Confluent Cloud. In case you don’t have a connector set up, you can manually insert records into the stream with INSERT VALUES statements:

INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 1,'121.219.110.170','branch_visit',0.4);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 2,'210.232.55.188','deposit',0.56);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 3,'84.197.123.173','web_open',0.33);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 4,'70.149.233.32','deposit',0.41);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 5,'221.234.209.67','deposit',0.44);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 6,'102.187.28.148','web_open',0.33);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 7,'135.37.250.250','mobile_open',0.97);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (2, 8,'122.157.243.25','deposit',0.83);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (3, 9,'114.215.212.181','deposit',0.86);
INSERT INTO customer_activity_stream (customer_id, activity_id, ip_address, activity_type, propensity_to_buy) VALUES (1, 10,'248.248.0.78','new_account',0.14);

Determining the next best offer

Now you’ll create the event stream that calculates the next best offer for your customers based on their activity.

Calculating the offer

To perform the next offer calculation, create a stream that performs a join between the CUSTOMER_ACTIVITY_STREAM and the CUSTOMERS table:

CREATE STREAM next_best_offer
WITH (
    KAFKA_TOPIC = 'NEXT_BEST_OFFER',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
) AS
SELECT
cask.ACTIVITY_ID,
cask.CUSTOMER_ID as CUSTOMER_ID,
cask.PROPENSITY_TO_BUY,
cask.ACTIVITY_TYPE,
ct.INCOME,
ct.FICO,
CASE
    WHEN ct.INCOME > 100000 AND ct.FICO < 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 1
    WHEN ct.INCOME < 50000 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 2
    WHEN ct.INCOME >= 50000 AND ct.FICO >= 600 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 3
    WHEN ct.INCOME > 100000 AND ct.FICO >= 700 AND cask.PROPENSITY_TO_BUY < 0.9 THEN 4
    ELSE 5
END AS OFFER_ID
FROM customer_activity_stream cask
INNER JOIN customers ct ON cask.CUSTOMER_ID = ct.CUSTOMER_ID;

The CASE statement is the workhorse for the query. It provides the next offer for the customer based on information resulting from the join. Note that you’re using an INNER JOIN here because if the customer id isn’t found in the CUSTOMERS table, there’s no calculation to make. You’ll notice that the result of the CASE statement is a single integer—the code for the offer to be made—so there’s one final step left.

Final results

For the last step, create a query which contains the final results by joining the NEXT_BEST_OFFER stream with the OFFERS table:

CREATE STREAM next_best_offer_lookup
WITH (
    KAFKA_TOPIC = 'NEXT_BEST_OFFER_LOOKUP',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 6
) AS
SELECT
    nbo.OFFER_ID,
    nbo.ACTIVITY_ID,
    nbo.CUSTOMER_ID,
    nbo.PROPENSITY_TO_BUY,
    nbo.ACTIVITY_TYPE,
    nbo.INCOME,
    nbo.FICO,
    ot.OFFER_NAME,
    ot.OFFER_URL
FROM next_best_offer nbo
INNER JOIN OFFERS ot
ON nbo.OFFER_ID = ot.OFFER_ID;

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;