Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
This tutorial creates simulated data with the Datagen
connector.
Then you can process the data in a variety of ways by, e.g., aggregating data into windows of time.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams
-- Stream of campaign contributions
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_datagen_campaign_finance WITH (
'connector.class' = 'DatagenSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'campaign_finance',
'quickstart' = 'CAMPAIGN_FINANCE',
'maxInterval' = '10',
'tasks.max' = '1',
'output.data.format' = 'JSON'
);
SET 'auto.offset.reset' = 'earliest';
-- Create a stream of campaign contributions
CREATE STREAM campaign_finance (
time BIGINT,
candidate_id VARCHAR,
party_affiliation VARCHAR,
contribution BIGINT
) WITH (
KAFKA_TOPIC = 'campaign_finance',
PARTITIONS = 1,
VALUE_FORMAT = 'JSON'
);
-- Categorize contributions by amount
CREATE STREAM categorization_donations WITH (KAFKA_TOPIC = 'categorization_donations') AS
SELECT
FORMAT_TIMESTAMP(FROM_UNIXTIME(time), 'yyyy-MM-dd HH:mm:ss') AS ts,
party_affiliation,
candidate_id,
CASE
WHEN contribution < 500 THEN 'small'
WHEN contribution < 2900 THEN 'medium'
ELSE 'large'
END AS category
FROM campaign_finance
EMIT CHANGES;
-- Get count of "small" contributions
CREATE TABLE contributions_small_count WITH (KAFKA_TOPIC = 'contributions_small_count', KEY_FORMAT='JSON') AS
SELECT
category,
party_affiliation,
COUNT(category) AS count_contributions
FROM categorization_donations
WHERE category = 'small'
GROUP BY category, party_affiliation;
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;