Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud.
This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO
functionality to mock the data.
If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR
commands and insert mock data into the streams.
Now you can process the data in a variety of ways.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams
-- Stream of customers
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_customers WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'customer_info',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
-- Stream of payments
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_payments WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'payments',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
-- Stream of aml_status
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_aml_status WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'aml_status',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
-- Stream of funds_status
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_funds_status WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'funds_status',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
SET 'auto.offset.reset' = 'earliest';
-- Register the initial streams and tables from the Kafka topics
CREATE STREAM PAYMENTS (
PAYMENT_ID INTEGER KEY,
CUSTID INTEGER,
ACCOUNTID INTEGER,
AMOUNT INTEGER,
BANK VARCHAR
) WITH (
KAFKA_TOPIC = 'payments',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE STREAM aml_status (
PAYMENT_ID INTEGER,
BANK VARCHAR,
STATUS VARCHAR
) WITH (
KAFKA_TOPIC = 'aml_status',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE STREAM funds_status (
PAYMENT_ID INTEGER,
REASON_CODE VARCHAR,
STATUS VARCHAR
) WITH (
KAFKA_TOPIC = 'funds_status',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE TABLE customers (
ID INTEGER PRIMARY KEY,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR,
EMAIL VARCHAR,
GENDER VARCHAR,
STATUS360 VARCHAR
) WITH (
KAFKA_TOPIC = 'customer_info',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
-- Enrich Payments stream with Customers table
CREATE STREAM enriched_payments AS SELECT
p.payment_id AS payment_id,
p.custid AS customer_id,
p.accountid,
p.amount,
p.bank,
c.first_name,
c.last_name,
c.email,
c.status360
FROM payments p LEFT JOIN customers c ON p.custid = c.id;
-- Combine the status streams
CREATE STREAM payment_statuses AS SELECT
payment_id,
status,
'AML' AS source_system
FROM aml_status;
INSERT INTO payment_statuses SELECT payment_id, status, 'FUNDS' AS source_system FROM funds_status;
-- Combine payment and status events in 1 hour window.
CREATE STREAM payments_with_status AS SELECT
ep.payment_id AS payment_id,
ep.accountid,
ep.amount,
ep.bank,
ep.first_name,
ep.last_name,
ep.email,
ep.status360,
ps.status,
ps.source_system
FROM enriched_payments ep LEFT JOIN payment_statuses ps WITHIN 1 HOUR ON ep.payment_id = ps.payment_id ;
-- Aggregate data to the final table
CREATE TABLE payments_final AS SELECT
payment_id,
HISTOGRAM(status) AS status_counts,
COLLECT_LIST('{ "system" : "' + source_system + '", "status" : "' + STATUS + '"}') AS service_status_list
FROM payments_with_status
WHERE status IS NOT NULL
GROUP BY payment_id;
If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO
statements to insert mock data into the source topics:
-- Customer Data
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (10,'Brena','Tollerton','btollerton9@furl.net','Female','silver');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (9,'Even','Tinham','etinham8@facebook.com','Male','silver');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (8,'Patti','Rosten','prosten7@ihg.com','Female','silver');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (7,'Fay','Huc','fhuc6@quantcast.com','Female','bronze');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (6,'Robinet','Leheude','rleheude5@reddit.com','Female','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (5,'Hansiain','Coda','hcoda4@senate.gov','Male','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (4,'Hashim','Rumke','hrumke3@sohu.com','Male','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (3,'Mariejeanne','Cocci','mcocci2@techcrunch.com','Female','bronze');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (2,'Ruthie','Brockherst','rbrockherst1@ow.ly','Female','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (1,'Rica','Blaisdell','rblaisdell0@rambler.ru','Female','bronze');
-- Payment Instruction Data
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (1,1,1234000,100,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (3,2,1234100,200,'Barclays Bank');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (5,3,1234200,300,'BNP Paribas');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (7,4,1234300,400,'Wells Fargo');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (9,5,1234400,500,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (11,6,1234500,600,'Royal Bank of Canada');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (13,7,1234600,700,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (15,8,1234700,800,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (17,9,1234800,900,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (19,10,1234900,1000,'United Overseas Bank');
-- AML Status Data
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (1,'Wells Fargo','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (3,'Commonwealth Bank of Australia','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (5,'Deutsche Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (7,'DBS','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (9,'United Overseas Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (11,'Citi','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (13,'Commonwealth Bank of Australia','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (15,'Barclays Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (17,'United Overseas Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (19,'Royal Bank of Canada','OK');
-- Funds Status Data
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (1,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (3,'99','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (5,'30','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (7,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (9,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (11,'00','NOT OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (13,'30','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (15,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (17,'10','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (19,'10','OK');
To validate that this recipe is working, run the following query:
SELECT * FROM payments_final;
Your output should resemble:
+-----------------------------------------------------+-----------------------------------------------------+-----------------------------------------------------+
|PAYMENT_ID |STATUS_COUNTS |SERVICE_STATUS_LIST |
+-----------------------------------------------------+-----------------------------------------------------+-----------------------------------------------------+
|1 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|15 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|7 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|17 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|5 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|19 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|9 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|3 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
|11 |{NOT OK=1, OK=1} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "NOT OK"}] |
|13 |{OK=2} |[{ "system" : "AML", "status" : "OK"}, { "system" : "|
| | |FUNDS", "status" : "OK"}] |
Query terminated
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;