Detect unusual credit card activity

Edit this page
One way many financial institutions detect fraud is to check for unusual activity in a short period of time, raising a red flag to promptly alert their customers and confirm any recent unexpected purchases. Fraud can involve using stolen credit cards, forging checks and account numbers, multiple duplicate transactions, and more. This tutorial analyzes a customer’s typical credit card spend, and flags the account when there are instances of excessive spending as a possible case of credit card theft.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

SET 'auto.offset.reset' = 'earliest';

-- Create the stream of customer data
CREATE STREAM fd_cust_raw_stream (
  ID BIGINT,
  FIRST_NAME VARCHAR,
  LAST_NAME VARCHAR,
  EMAIL VARCHAR,
  AVG_CREDIT_SPEND DOUBLE
) WITH (
  KAFKA_TOPIC = 'FD_customers',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Repartition the customer data stream by account_id to prepare for the join later
CREATE STREAM fd_customer_rekeyed WITH (KAFKA_TOPIC = 'fd_customer_rekeyed') AS
  SELECT *
  FROM fd_cust_raw_stream
  PARTITION BY ID;

-- Register the partitioned customer data topic as a table
CREATE TABLE fd_customers (
  ID BIGINT PRIMARY KEY,
  FIRST_NAME VARCHAR,
  LAST_NAME VARCHAR,
  EMAIL VARCHAR,
  AVG_CREDIT_SPEND DOUBLE
) WITH (
  KAFKA_TOPIC = 'fd_customer_rekeyed',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Create the stream of transactions
CREATE STREAM fd_transactions (
  ACCOUNT_ID BIGINT,
  TIMESTAMP VARCHAR,
  CARD_TYPE VARCHAR,
  AMOUNT DOUBLE,
  IP_ADDRESS VARCHAR,
  TRANSACTION_ID VARCHAR
) WITH (
  KAFKA_TOPIC = 'FD_transactions',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Join the transactions to customer information
CREATE STREAM fd_transactions_enriched WITH (KAFKA_TOPIC = 'transactions_enriched') AS
  SELECT
    T.ACCOUNT_ID,
    T.CARD_TYPE,
    T.AMOUNT,
    C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
    C.AVG_CREDIT_SPEND
  FROM fd_transactions T
  INNER JOIN fd_customers C
  ON T.ACCOUNT_ID = C.ID;

-- Aggregate the stream of transactions for each account ID using a two-hour
-- tumbling window, and filter for accounts in which the total spend in a
-- two-hour period is greater than the customer’s average:
CREATE TABLE fd_possible_stolen_card WITH (KAFKA_TOPIC = 'FD_possible_stolen_card', KEY_FORMAT = 'JSON') AS
  SELECT
    TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START,
    T.ACCOUNT_ID,
    T.CARD_TYPE,
    SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND,
    T.FULL_NAME,
    MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND
  FROM fd_transactions_enriched T
  WINDOW TUMBLING (SIZE 2 HOURS)
  GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME
  HAVING SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND);

-- Add the INSERT INTO commands to insert mock data

INSERT INTO fd_cust_raw_stream (id, first_name, last_name, email, avg_credit_spend) VALUES (6011000990139424, 'Janice', 'Smith', 'jsmith@mycompany.com', 500.00);
INSERT INTO fd_cust_raw_stream (id, first_name, last_name, email, avg_credit_spend) VALUES (3530111333300000, 'George', 'Mall', 'gmall@mycompany.com', 20.00);

INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (6011000990139424, '2021-09-23T10:50:00.000Z', 'visa', 542.99, '192.168.44.1', '3985757');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (6011000990139424, '2021-09-23T10:50:01.000Z', 'visa', 611.48, '192.168.44.1', '8028435');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (3530111333300000, '2021-09-23T10:50:00.000Z', 'mastercard', 10.31, '192.168.101.3', '1695780');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (3530111333300000, '2021-09-23T10:50:00.000Z', 'mastercard', 5.37, '192.168.101.3', '1695780');

To validate that this recipe is working, run the following query:

SELECT * FROM fd_possible_stolen_card EMIT CHANGES LIMIT 1;

Your output should resemble:

+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|ACCOUNT_ID      |CARD_TYPE       |FULL_NAME       |WINDOWSTART     |WINDOWEND       |WINDOW_START    |TOTAL_CREDIT_SPE|AVG_CREDIT_SPEND|
|                |                |                |                |                |                |ND              |                |
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|6011000990139424|visa            |Janice Smith    |1646424000000   |1646431200000   |2022-03-04 20:00|1154.47         |500.0           |
|                |                |                |                |                |:00 +0000       |                |                |
Query terminated

Test with real data

3

If you have real end systems to connect to, adapt the sample connector configuration(s) below and run them from ksqlDB with CREATE SOURCE CONNECTOR.

-- Stream of transactions
CREATE SOURCE CONNECTOR IF NOT EXISTS FD_transactions WITH (
  'connector.class'          = 'OracleDatabaseSource',
  'name'                     = 'recipe-oracle-transactions-cc',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '1521',
  'connection.user'          = '<database-username>',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'TRANSACTIONS',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UCT',
  'tasks.max'                = '1'
);

-- Stream of customers
CREATE SOURCE CONNECTOR IF NOT EXISTS FD_customers WITH (
  'connector.class'          = 'OracleDatabaseSource',
  'name'                     = 'recipe-oracle-customers-cc',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '1521',
  'connection.user'          = '<database-username>',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'CUSTOMERS',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UCT',
  'tasks.max'                = '1'
);

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;