Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud.
This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO
functionality to mock the data.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
SET 'auto.offset.reset' = 'earliest';
-- Create the stream of customer data
CREATE STREAM fd_cust_raw_stream (
ID BIGINT,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR,
EMAIL VARCHAR,
AVG_CREDIT_SPEND DOUBLE
) WITH (
KAFKA_TOPIC = 'FD_customers',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
-- Repartition the customer data stream by account_id to prepare for the join later
CREATE STREAM fd_customer_rekeyed WITH (KAFKA_TOPIC = 'fd_customer_rekeyed') AS
SELECT *
FROM fd_cust_raw_stream
PARTITION BY ID;
-- Register the partitioned customer data topic as a table
CREATE TABLE fd_customers (
ID BIGINT PRIMARY KEY,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR,
EMAIL VARCHAR,
AVG_CREDIT_SPEND DOUBLE
) WITH (
KAFKA_TOPIC = 'fd_customer_rekeyed',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
-- Create the stream of transactions
CREATE STREAM fd_transactions (
ACCOUNT_ID BIGINT,
TIMESTAMP VARCHAR,
CARD_TYPE VARCHAR,
AMOUNT DOUBLE,
IP_ADDRESS VARCHAR,
TRANSACTION_ID VARCHAR
) WITH (
KAFKA_TOPIC = 'FD_transactions',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
-- Join the transactions to customer information
CREATE STREAM fd_transactions_enriched WITH (KAFKA_TOPIC = 'transactions_enriched') AS
SELECT
T.ACCOUNT_ID,
T.CARD_TYPE,
T.AMOUNT,
C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.AVG_CREDIT_SPEND
FROM fd_transactions T
INNER JOIN fd_customers C
ON T.ACCOUNT_ID = C.ID;
-- Aggregate the stream of transactions for each account ID using a two-hour
-- tumbling window, and filter for accounts in which the total spend in a
-- two-hour period is greater than the customer’s average:
CREATE TABLE fd_possible_stolen_card WITH (KAFKA_TOPIC = 'FD_possible_stolen_card', KEY_FORMAT = 'JSON') AS
SELECT
TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START,
ACCOUNT_ID,
CARD_TYPE,
SUM(AMOUNT) AS TOTAL_CREDIT_SPEND,
FULL_NAME,
MAX(AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND
FROM fd_transactions_enriched
WINDOW TUMBLING (SIZE 2 HOURS)
GROUP BY ACCOUNT_ID, CARD_TYPE, FULL_NAME
HAVING SUM(AMOUNT) > MAX(AVG_CREDIT_SPEND);
-- Add the INSERT INTO commands to insert mock data
INSERT INTO fd_cust_raw_stream (id, first_name, last_name, email, avg_credit_spend) VALUES (6011000990139424, 'Janice', 'Smith', 'jsmith@mycompany.com', 500.00);
INSERT INTO fd_cust_raw_stream (id, first_name, last_name, email, avg_credit_spend) VALUES (3530111333300000, 'George', 'Mall', 'gmall@mycompany.com', 20.00);
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (6011000990139424, '2021-09-23T10:50:00.000Z', 'visa', 542.99, '192.168.44.1', '3985757');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (6011000990139424, '2021-09-23T10:50:01.000Z', 'visa', 611.48, '192.168.44.1', '8028435');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (3530111333300000, '2021-09-23T10:50:00.000Z', 'mastercard', 10.31, '192.168.101.3', '1695780');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (3530111333300000, '2021-09-23T10:50:00.000Z', 'mastercard', 5.37, '192.168.101.3', '1695780');
To validate that this recipe is working, run the following query:
SELECT * FROM fd_possible_stolen_card EMIT CHANGES LIMIT 1;
Your output should resemble:
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|ACCOUNT_ID |CARD_TYPE |FULL_NAME |WINDOWSTART |WINDOWEND |WINDOW_START |TOTAL_CREDIT_SPE|AVG_CREDIT_SPEND|
| | | | | | |ND | |
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|6011000990139424|visa |Janice Smith |1646424000000 |1646431200000 |2022-03-04 20:00|1154.47 |500.0 |
| | | | | |:00 +0000 | | |
Query terminated
If you have real end systems to connect to, adapt the sample connector configuration(s) below and run them from ksqlDB with CREATE SOURCE CONNECTOR
.
-- Stream of transactions
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_oracle_transactions_cc WITH (
'connector.class' = 'OracleDatabaseSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '1521',
'connection.user' = '<database-username>',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'TRANSACTIONS',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UCT',
'tasks.max' = '1'
);
-- Stream of customers
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_oracle_customers_cc WITH (
'connector.class' = 'OracleDatabaseSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '1521',
'connection.user' = '<database-username>',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'CUSTOMERS',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UCT',
'tasks.max' = '1'
);
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;