Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud.
This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO
functionality to mock the data.
If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR
commands and insert mock data into the streams.
For this tutorial, we’ll be using a respondent dataset to reflect the userbase that’s taking the surveys as well as a stream of survey responses where repsondents answer single-question surveys. Kafka Connect is a great tool that can easily stream in datasets from external sources. If we suppose that we’re issuing an internal survey, both the respondent and response datasets might exist in a ServiceNow table; use the following template as a guide for setting up your connectors to extract this information and move it to Kafka.
-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_servicenow_survey_analysis_respondents WITH (
'connector.class' = 'ServiceNowSource',
'kafka.auth.mode' = 'KAFKA_API_KEY',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'survey-respondents',
'output.data.format' = 'JSON',
'servicenow.url' = 'my.servicenow.instance',
'servicenow.table' = 'respondents',
'servicenow.user' = 'servicenowuser',
'servicenow.password'= '********',
'tasks.max' = '1'
);
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_servicenow_survey_analysis_responses WITH (
'connector.class' = 'ServiceNowSource',
'kafka.auth.mode' = 'KAFKA_API_KEY',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'survey-responses',
'output.data.format' = 'JSON',
'servicenow.url' = 'my.servicenow.instance',
'servicenow.table' = 'surveys-responses',
'servicenow.user' = 'servicenowuser',
'servicenow.password'= '********',
'tasks.max' = '1'
);
To begin, we first choose to represent our input datasets as either a TABLE
or a STREAM
. A STREAM
represents unbounded, ongoing events while a TABLE
shows the latest value for a given key. Survey responses are something that may continue to flow into our system—every survey response is valuable in and of itself, and we should care about each one when we do our analysis. Respondent data is something that could change over time. For example, a user could change their user profile which could trigger a new event on the Kafka topic. Because of this, we’ll use a TABLE
for the respondents and a STREAM
for the survey responses.
SET 'auto.offset.reset' = 'earliest';
-- Create Table of Survey Respondents
CREATE TABLE SURVEY_RESPONDENTS (
RESPONDENT_ID VARCHAR PRIMARY KEY,
NAME VARCHAR,
TEAM VARCHAR,
EMAIL VARCHAR,
ADDRESS VARCHAR
) WITH (
KAFKA_TOPIC = 'survey-respondents',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
);
-- Create Survey Responses Stream
CREATE STREAM SURVEY_RESPONSES (
SURVEY_ID VARCHAR KEY,
RESPONDENT_ID VARCHAR,
SURVEY_QUESTION VARCHAR,
SURVEY_RESPONSE VARCHAR
) WITH (
KAFKA_TOPIC = 'survey-responses',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
);
As a quick aside: one of the benefits of building a stream processing application is that you can make use of intermediate data streams for different downstream applications. For example, we may want to mask some data from our SURVEY-RESPONDENTS
stream. Every time a new event flows into the SURVEY-RESPONDENTS
stream, we can mask the appropriate fields and send the masked data downstream for other applications to tap into.
-- Mask Respondent Data
CREATE TABLE SURVEY_RESPONDENTS_MASKED WITH (
KAFKA_TOPIC = 'survey-respondents-masked',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
) AS
SELECT
RESPONDENT_ID,
NAME,
TEAM,
MASK(EMAIL) AS EMAIL,
MASK(ADDRESS) AS ADDRESS
FROM SURVEY_RESPONDENTS
EMIT CHANGES;
With our inputs in ksqlDB, we can start to build up a real time stream processing application. First, we enrich the survey result set with the respondent data—this is done through an inner join. By doing so, we can leverage more of the respondent data in our analysis.
-- Enrich Survey Responses with Respondent Data
CREATE STREAM SURVEY_RESPONSES_ENRICHED WITH (
KAFKA_TOPIC = 'survey-responses-enriched',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
) AS
SELECT
RESPONSES.RESPONDENT_ID AS RESPONDENT_ID,
RESPONSES.SURVEY_ID,
RESPONSES.SURVEY_QUESTION,
RESPONSES.SURVEY_RESPONSE,
RESPONDENTS.NAME,
RESPONDENTS.TEAM
FROM SURVEY_RESPONSES RESPONSES
INNER JOIN SURVEY_RESPONDENTS_MASKED RESPONDENTS
ON RESPONSES.RESPONDENT_ID = RESPONDENTS.RESPONDENT_ID
EMIT CHANGES;
This application processes survey responses as they’re captured in real time. It aggregates survey responses from a given question and outputs the latest results. We’ve provided a number of queries to be used for analysis. Since we have access to the respondent data, we can get an idea of the distribution of respondents across teams. Or we can focus on the survey results, themselves.
Analyzing the events in real time—as opposed to batch—gives the flexibility to see outcomes as they occur or in a windowed fashion depending on the consuming application. A second query has been provided to show how to window over this output result set and only see the final count of survey results after a given window has closed.
-- Fetch Results Live
CREATE TABLE SURVEY_RESULTS_LIVE WITH (
KAFKA_TOPIC = 'survey-results-live',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
) AS
SELECT
SURVEY_QUESTION,
HISTOGRAM(SURVEY_RESPONSE) AS RESULTS
FROM SURVEY_RESPONSES_ENRICHED
GROUP BY SURVEY_QUESTION
EMIT CHANGES;
-- Fetch Results Windowed Per Team
CREATE TABLE SURVEY_RESULTS_WINDOWED WITH (
KAFKA_TOPIC = 'survey-results-windowed',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
) AS
SELECT
SURVEY_QUESTION,
HISTOGRAM(TEAM) AS TEAMS
FROM SURVEY_RESPONSES_ENRICHED
WINDOW TUMBLING (SIZE 24 HOURS)
GROUP BY SURVEY_QUESTION
EMIT FINAL;
If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO
statements to insert mock data into the source topics:
-- Survey Respondents
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ('159', 'Penelope Coin', 'DevX', 'pennycoin@email.com', '183 Maple Drive');
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ( '93', 'Theodore Bear', 'Marketing', 'teddyb@email.com', '68 El Camino Real');
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ('184', 'Jack Pepper', 'DevX', 'pepper.jack@email.com', '8299 Skyline Drive');
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ( '15', 'John Deer', 'Engineering', 'jdeer15@email.com', '928 Maple Street');
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ('282', 'Jane Doe', 'Engineering', 'jane.doe@email.com', '110 Rocky Road');
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ('739', 'Monte Wisoky', 'Engineering', 'wisoky.m@email.com', '1 First Street');
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ('250', 'Tessie Cremin', 'DevX', 'tcremin@email.com', '8 B Street Apt A');
INSERT INTO SURVEY_RESPONDENTS (RESPONDENT_ID, NAME, TEAM, EMAIL, ADDRESS) VALUES ('301', 'Wilfrid Howe', 'Marketing', 'whowe301@email.com', '617 Hamilton Road');
-- Survey Responses
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '184', 'What is your Favorite Thanksgiving Pie?', 'Apple Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '159', 'What is your Favorite Thanksgiving Pie?', 'Pumpkin Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '93', 'What is your Favorite Thanksgiving Pie?', 'Pecan Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '301', 'What is your Favorite Thanksgiving Pie?', 'Apple Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '15', 'What is your Favorite Thanksgiving Pie?', 'Pumpkin Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '282', 'What is your Favorite Thanksgiving Pie?', 'Pumpkin Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '250', 'What is your Favorite Thanksgiving Pie?', 'Apple Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('0', '739', 'What is your Favorite Thanksgiving Pie?', 'Apple Pie');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '15', 'What is your Favorite Thanksgiving Dish?', 'Turkey');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '282', 'What is your Favorite Thanksgiving Dish?', 'Stuffing');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '250', 'What is your Favorite Thanksgiving Dish?', 'Turkey');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '739', 'What is your Favorite Thanksgiving Dish?', 'Stuffing');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '159', 'What is your Favorite Thanksgiving Dish?', 'Cranberry Jelly');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '184', 'What is your Favorite Thanksgiving Dish?', 'Turkey');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '93', 'What is your Favorite Thanksgiving Dish?', 'Stuffing');
INSERT INTO SURVEY_RESPONSES (SURVEY_ID, RESPONDENT_ID, SURVEY_QUESTION, SURVEY_RESPONSE) VALUES ('1', '301', 'What is your Favorite Thanksgiving Dish?', 'Green Beans');
To validate that this recipe is working, run the following query:
SELECT * FROM SURVEY_RESULTS_LIVE;
Your output should resemble:
+-----------------------------------------------------------------------+---------------------------------------------------------------------------+
|SURVEY_QUESTION |RESULTS |
+-----------------------------------------------------------------------+---------------------------------------------------------------------------+
|What is your Favorite Thanksgiving Dish? |{Cranberry Jelly=1, Turkey=3, Green Beans=1, Stuffing=3} |
|What is your Favorite Thanksgiving Pie? |{Pumpkin Pie=3, Pecan Pie=1, Apple Pie=4} |
Query terminated
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;