First, you will need to create a ksqlDB table and Kafka topic to represent the suspicious names data. You can create a table from a Kafka topic or derive one from an existing stream or table. In both cases, a table’s underlying data is durably stored in a topic on the Kafka brokers. In this tutorial we are creating a new Kafka topic for our table. If kafka_topic
were not specified in the query, a new Kafka topic would be created for us.
CREATE TABLE suspicious_names (CREATED_TS VARCHAR,
COMPANY_NAME VARCHAR PRIMARY KEY,
COMPANY_ID INT)
WITH (kafka_topic='suspicious_names',
partitions=1,
value_format='JSON',
timestamp='CREATED_TS',
timestamp_format = 'yyyy-MM-dd HH:mm:ss');
A table is more fitting than a stream for the suspicious names data because it is a mutable collection that changes over time. We may want to add company names to this table or remove them in the future.
Likewise, you’ll need a ksqlDB stream and Kafka topic to represent transaction events. The transaction information includes the identifier, the user sending the money, the name of the recipient, the amount of money sent, and the time of the transaction. Since this data represents a historical sequence of events, a stream is more appropriate than a table.
CREATE STREAM transactions (TXN_ID BIGINT, USERNAME VARCHAR, RECIPIENT VARCHAR, AMOUNT DOUBLE, TS VARCHAR)
WITH (kafka_topic='transactions',
partitions=1,
value_format='JSON',
timestamp='TS',
timestamp_format = 'yyyy-MM-dd HH:mm:ss');
Let’s add some suspicious names data into our reference table. Note that the timestamps for these records are between 3 and 5 days ago.
INSERT INTO suspicious_names (CREATED_TS, COMPANY_NAME, COMPANY_ID) VALUES (FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (5 * 24 * 60 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'), 'Verizon', 1);
INSERT INTO suspicious_names (CREATED_TS, COMPANY_NAME, COMPANY_ID) VALUES (FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (4 * 24 * 60 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'), 'Spirit Halloween', 2);
INSERT INTO suspicious_names (CREATED_TS, COMPANY_NAME, COMPANY_ID) VALUES (FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (3 * 24 * 60 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'), 'Best Buy', 3);
Let’s add some transaction data into our event stream. Note that the timestamps for these transactions are all within the past day, i.e., after the timestamps of the suspicious name records.
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (9900, 'Abby Normal', 'Verizon', 22.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 2 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (12, 'Victor von Frankenstein', 'Tattered Cover', 7.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 3 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (13, 'Frau Blücher', 'Peebles', 70.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 4 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (9903, 'Abby Normal', 'Verizon', 61.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 5 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (9901, 'Abby Normal', 'Spirit Halloween', 83.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 6 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (9902, 'Abby Normal', 'Spirit Halloween', 46.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 7 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (9904, 'Abby Normal', 'Spirit Halloween', 59.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 8 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (6, 'Victor von Frankenstein', 'Confluent Cloud', 21.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 9 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (18, 'Frau Blücher', 'Target', 70.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 10 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (7, 'Victor von Frankenstein', 'Verizon', 100.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 11 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
INSERT INTO transactions (TXN_ID, USERNAME, RECIPIENT, AMOUNT, TS) VALUES (19, 'Frau Blücher', 'Goodwill', 7.0, FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (1 * 24 * 60 * 60 * 1000 + 12 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'));
Set ksqlDB to process data from the beginning of each Kafka topic:
SET 'auto.offset.reset' = 'earliest';