Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud.
This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO
functionality to mock the data.
If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR
commands and insert mock data into the streams.
For this tutorial, we are interested in knowing each marketplace event for an item, specifically its pricing. This creates a stream of events, upon which real-time stream processing can keep state and calculate pricing statistics.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_pricing WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'sales',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_items WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'items',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
SET 'auto.offset.reset' = 'earliest';
-- Create stream of sales
CREATE STREAM sales (
item_id INT KEY,
seller_id VARCHAR,
price DOUBLE
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'sales',
PARTITIONS = 6
);
-- Create table of items
CREATE TABLE items (
item_id INT PRIMARY KEY,
item_name VARCHAR
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'items',
PARTITIONS = 6
);
-- Calculate minimum, maximum, and average price, per item, and join with item name
CREATE TABLE sales_stats WITH (KEY_FORMAT = 'JSON') AS
SELECT S.item_id,
I.item_name,
MIN(price) AS price_min,
MAX(price) AS price_max,
AVG(price) AS price_avg
FROM sales S
INNER JOIN items I ON S.item_id = I.item_id
GROUP BY S.item_id, I.item_name
EMIT CHANGES;
If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO
statements to insert mock data into the source topics:
INSERT INTO items (item_id, item_name) VALUES (1, 'Pikachu card');
INSERT INTO items (item_id, item_name) VALUES (2, 'Charizard card');
INSERT INTO items (item_id, item_name) VALUES (3, 'Mew card');
INSERT INTO sales (item_id, price) VALUES (1, 10.00);
INSERT INTO sales (item_id, price) VALUES (2, 20.00);
INSERT INTO sales (item_id, price) VALUES (3, 30.00);
INSERT INTO sales (item_id, price) VALUES (1, 12.00);
INSERT INTO sales (item_id, price) VALUES (1, 17.00);
INSERT INTO sales (item_id, price) VALUES (3, 26.00);
To validate that this recipe is working, run the following query:
SELECT * FROM sales_stats;
Your output should resemble:
+------------------------+------------------------+------------------------+------------------------+------------------------+
|S_ITEM_ID |ITEM_NAME |PRICE_MIN |PRICE_MAX |PRICE_AVG |
+------------------------+------------------------+------------------------+------------------------+------------------------+
|1 |Pikachu card |10.0 |17.0 |13.0 |
|3 |Mew card |26.0 |30.0 |28.0 |
|2 |Charizard card |20.0 |20.0 |20.0 |
Query terminated
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;