Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
This tutorial creates simulated data with the Datagen
connector.
Then you can process the data in a variety of ways by enriching the orders and clickstream data with product and customer attributes, analyze errors, aggregate data by time, etc.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
-- Stream of products (shoes)
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOES WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'shoes',
'quickstart' = 'SHOES',
'maxInterval' = '10',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);
-- Stream of customers
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOE_CUSTOMERS WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'shoe_customers',
'quickstart' = 'SHOE_CUSTOMERS',
'maxInterval' = '10',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);
-- Stream of orders
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOE_ORDERS WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'shoe_orders',
'quickstart' = 'SHOE_ORDERS',
'maxInterval' = '10',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);
-- Stream of ecommerce website clicks
CREATE SOURCE CONNECTOR IF NOT EXISTS DATAGEN_SHOE_CLICKSTREAM WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'shoe_clickstream',
'quickstart' = 'SHOE_CLICKSTREAM',
'maxInterval' = '30',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);
SET 'auto.offset.reset' = 'earliest';
-- Create a stream of products (shoes)
CREATE STREAM shoes (
id VARCHAR,
brand VARCHAR,
name VARCHAR,
sale_price INT,
rating DOUBLE,
ts BIGINT
) WITH (
KAFKA_TOPIC = 'shoes',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
);
-- Create a stream of customers
CREATE STREAM shoe_customers (
id VARCHAR,
first_name VARCHAR,
last_name VARCHAR,
email VARCHAR,
phone VARCHAR,
street_address VARCHAR,
state VARCHAR,
zip_code VARCHAR,
country VARCHAR,
country_code VARCHAR
) WITH (
KAFKA_TOPIC = 'shoe_customers',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
);
-- Create a stream of orders
CREATE STREAM shoe_orders (
order_id INT,
product_id VARCHAR,
customer_id VARCHAR,
ts BIGINT
) WITH (
KAFKA_TOPIC = 'shoe_orders',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1,
TIMESTAMP = 'ts'
);
-- Create a stream of product website clicks
CREATE STREAM shoe_clickstream (
product_id VARCHAR,
user_id VARCHAR,
view_time INT,
page_url VARCHAR,
ip VARCHAR,
ts BIGINT
) WITH (
KAFKA_TOPIC = 'shoe_clickstream',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1,
TIMESTAMP = 'ts'
);
-- Create a stream of enriched orders
CREATE STREAM shoe_orders_enriched WITH (
kafka_topic='shoe_orders_enriched',
partitions=1,
value_format='JSON'
) AS
SELECT * FROM shoe_orders
INNER JOIN shoe_clickstream
WITHIN 1 HOUR
GRACE PERIOD 1 MINUTE
ON shoe_orders.customer_id = shoe_clickstream.user_id
EMIT CHANGES;
After processing the data, send it to Elasticsearch.
-- Send data to Elasticsearch
CREATE SINK CONNECTOR IF NOT EXISTS analyzed_clickstream WITH (
'connector.class' = 'ElasticsearchSink',
'name' = 'recipe-elasticsearch-analyzed_clickstream',
'input.data.format' = 'JSON',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'topics' = 'orders_enriched',
'connection.url' = '<elasticsearch-URI>',
'connection.user' = '<elasticsearch-username>',
'connection.password' = '<elasticsearch-password>',
'type.name' = 'type.name=kafkaconnect',
'key.ignore' = 'true',
'schema.ignore' = 'true'
);
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;