Create geolocation-based customer alerts and promotions

Edit this page
In an effort to build more personalized offers and promotions, organizations have turned toward geolocation alerts. This uses the merchant and mobile user’s geolocations to alert customers of the latest promotions when they near one of their stores. However, this can be difficult because it requires determining the distance between static store location data and continuous streams of geolocation data from hundreds of thousands of mobile devices. This recipe showcases how to do this easily with ksqlDB by aggregating and comparing merchant and mobile user geolocation data to produce user proximity alerts and notifications.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

This application compares merchant and mobile user geolocation data to produce user proximity alerts. Initially, merchant data is sourced from a database and contains a Geohash value per merchant. This data is streamed from a source database and loaded into a ksqlDB table, keyed by the Geohash to a defined precision (the length of the hash). User location data is streamed from mobile devices and is joined to the merchant table by the Geohash. Location events that match are published to a "raw" alerts stream, which is further refined using the ksqlDB scalar function GEO_DISTANCE. This produces a final result of promo_alerts, which contains user and merchant data with geolocation information.

This tutorial assumes that you have merchant data stored in an SQL database. The merchant data includes geolocation information, which will be matched with the stream of location data from a user’s device. First, deploy a source connector that will read the merchant data into a Kafka topic for stream processing in ksqlDB.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_merchant_data WITH (
  'connector.class'       = 'PostgresSource',
  'kafka.api.key'         = '<my-kafka-api-key>',
  'kafka.api.secret'      = '<my-kafka-api-secret>',
  'connection.host'       = '<database-host>',
  'connection.port'       = '5432',
  'connection.user'       = 'postgres',
  'connection.password'   = '<database-password>',
  'db.name'               = '<db-name>',
  'table.whitelist'       = 'merchant-locations',
  'timestamp.column.name' = 'created_at',
  'output.data.format'    = 'JSON',
  'db.timezone'           = 'UTC',
  'tasks.max'             = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Creates a table of merchant data including the calculated geohash
CREATE TABLE merchant_locations (
  id INT PRIMARY KEY,
  description VARCHAR,
  latitude DECIMAL(10,7),
  longitude DECIMAL(10,7),
  geohash VARCHAR
) WITH (
  KAFKA_TOPIC = 'merchant-locations',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Creates a table to lookup merchants based on a
--    substring (precision) of the geohash
CREATE TABLE merchants_by_geohash
WITH (
  KAFKA_TOPIC = 'merchant-geohash',
  FORMAT = 'JSON',
  PARTITIONS = 6
) AS
SELECT
  SUBSTRING(geohash, 1, 6) AS geohash,
  COLLECT_LIST(id) as id_list
FROM merchant_locations
GROUP BY SUBSTRING(geohash, 1, 6)
EMIT CHANGES;

-- Creates a stream of user location data including the calculated geohash
CREATE STREAM user_locations (
  id INT,
  latitude DECIMAL(10,7),
  longitude DECIMAL(10,7),
  geohash VARCHAR
) WITH (
  KAFKA_TOPIC = 'user-locations',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Creates a stream of alerts when a user's geohash based location roughly
--    intersects a collection of merchants locations from the
--    merchants_by_geohash table.
CREATE STREAM alerts_raw
WITH (
  KAFKA_TOPIC = 'alerts-raw',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
) AS
SELECT
  user_locations.id as user_id,
  user_locations.latitude AS user_latitude,
  user_locations.longitude AS user_longitude,
  SUBSTRING(user_locations.geohash, 1, 6) AS user_geohash,
  EXPLODE(merchants_by_geohash.id_list) AS merchant_id
FROM user_locations
LEFT JOIN merchants_by_geohash ON SUBSTRING(user_locations.geohash, 1, 6) =
  merchants_by_geohash.geohash
PARTITION BY null
EMIT CHANGES;

-- Creates a stream of promotion alerts to send a user when their location
--    intersects with a merchant within a specified distance (0.2 KM)
CREATE STREAM promo_alerts
WITH (
  KAFKA_TOPIC = 'promo-alerts',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
) AS
SELECT
  alerts_raw.user_id,
  alerts_raw.user_geohash,
  merchant_locations.description AS merchant_description,
  CAST(
    GEO_DISTANCE(alerts_raw.user_latitude, alerts_raw.user_longitude,
                 merchant_locations.latitude, merchant_locations.longitude,
        'KM') * 1000 AS INT) as distance_meters,
  STRUCT(lat := CAST(alerts_raw.user_latitude AS DOUBLE), lon := CAST(alerts_raw.user_longitude AS DOUBLE)) AS geopoint
FROM alerts_raw
LEFT JOIN merchant_locations on alerts_raw.merchant_id = merchant_locations.id
WHERE GEO_DISTANCE(
        alerts_raw.user_latitude, alerts_raw.user_longitude,
        merchant_locations.latitude, merchant_locations.longitude, 'KM') < 0.2
PARTITION BY null
EMIT CHANGES;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

Note: The manual insert commands below should be executed in two discrete steps, see the code comments for details.

-- For the purposes of this recipe when testing by inserting records manually,
--  a short pause between these insert groups is required. This allows
--  the merchant location data to be processed by the merchants_by_geohash
--  table before the user location data is joined in the alerts_raw stream.
INSERT INTO MERCHANT_LOCATIONS (id, latitude, longitude, description, geohash) VALUES (1, 14.5486606, 121.0477211, '7-Eleven RCBC Center', 'wdw4f88206fx');
INSERT INTO MERCHANT_LOCATIONS (id, latitude, longitude, description, geohash) VALUES (2, 14.5473328, 121.0516176, 'Jordan Manila', 'wdw4f87075kt');
INSERT INTO MERCHANT_LOCATIONS (id, latitude, longitude, description, geohash) VALUES (3, 14.5529666, 121.0516716, 'Lawson Eco Tower', 'wdw4f971hmsv');

-- Wait 10 seconds before inserting the records below

INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (1, 14.5472791, 121.0475401, 'wdw4f820h17g');
INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (2, 14.5486952, 121.0521851, 'wdw4f8e82376');
INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (2, 14.5517401, 121.0518652, 'wdw4f9560buw');
INSERT INTO USER_LOCATIONS (id, latitude, longitude, geohash) VALUES (2, 14.5500341, 121.0555802, 'wdw4f8vbp6yv');

To validate that this recipe is working, run the following query:

SELECT * FROM promo_alerts EMIT CHANGES LIMIT 3;

Your output should resemble:

+-----------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+
|USER_ID                            |USER_GEOHASH                       |MERCHANT_DESCRIPTION               |DISTANCE_METERS                    |GEOPOINT                           |
+-----------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+
|2                                  |wdw4f9                             |Lawson Eco Tower                   |137                                |{LAT=14.5517401, LON=121.0518652}  |
|1                                  |wdw4f8                             |7-Eleven RCBC Center               |154                                |{LAT=14.5472791, LON=121.0475401}  |
|2                                  |wdw4f8                             |Jordan Manila                      |163                                |{LAT=14.5486952, LON=121.0521851}  |
Limit Reached
Query terminated

Write the data out

4

Sinking the promotion alerts out to Elasticsearch facilitates further search processing:

-- Send data to Elasticsearch
CREATE SINK CONNECTOR IF NOT EXISTS recipe_elasticsearch_analyzed_clickstream WITH (
  'connector.class'          = 'ElasticsearchSink',
  'input.data.format'        = 'JSON',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'user_ip_activity, errors_per_min_alert',
  'connection.url'           = '<elasticsearch-URI>',
  'connection.user'          = '<elasticsearch-username>',
  'connection.password'      = '<elasticsearch-password>',
  'type.name'                = 'type.name=kafkaconnect',
  'key.ignore'               = 'true',
  'schema.ignore'            = 'true'
);

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;