SET 'auto.offset.reset' = 'earliest';
SET 'ksql.query.pull.table.scan.enabled' = 'true';
Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud.
This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO
functionality to mock the data.
The application breaks up the stream of messages into individual conversations and puts each of those conversations in time order, keeping track of the sender as we go.
Then it builds up the function (old_state, element) ⇒ …
, which considers different states.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM messages (
send_id BIGINT,
recv_id BIGINT,
message VARCHAR
) WITH (
KAFKA_TOPIC = 'messages',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
CREATE TABLE conversations AS
SELECT
ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
AS_VALUE(ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>')) AS conversation_value
FROM messages
GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>')
HAVING
REDUCE(
ENTRIES(
AS_MAP(
COLLECT_LIST(CAST(rowtime AS VARCHAR)),
COLLECT_LIST(send_id)
),
true
),
STRUCT(step := 'start', last_sender := CAST(-1 AS BIGINT)),
(old_state, element) => CASE
WHEN old_state->step = 'start'
THEN struct(step := 'opened', last_sender := element->v)
WHEN old_state->step = 'opened' AND old_state->last_sender != element->v
THEN struct(step := 'replied', last_sender := element->v)
WHEN old_state->step = 'replied' AND old_state->last_sender != element->v
THEN struct(step := 'connected', last_sender := element->v)
ELSE old_state
END
)->step = 'connected'
EMIT CHANGES;
-- Add the INSERT INTO commands to insert mock data
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 2, 'Hello' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 3, 'Hello' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 2, 1, 'Hey there' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 2, 'What''s going on?' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'Hi' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'Hello' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 5, 4, 'Hi' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 4, 5, 'Well hi to you too.' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'I''d like to chat.' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 5, 4, 'Would you like to discuss event streaming with me?' );
To validate that this recipe is working, run the following query:
SELECT * FROM conversations EMIT CHANGES LIMIT 2;
Your output should resemble:
+------------------------------------------------------------------------+------------------------------------------------------------------------+
|CONVERSATION_ID |CONVERSATION_VALUE |
+------------------------------------------------------------------------+------------------------------------------------------------------------+
|4<>5 |4<>5 |
|1<>2 |1<>2 |
Limit Reached
Query terminated
If you have real end systems to connect to, adapt the sample connector configuration(s) below and run them from ksqlDB with CREATE SOURCE CONNECTOR
.
-- Example
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_dating WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '<database-port>',
'connection.user' = '<database-user>',
'connection.password' = '<database-password>',
'database' = '<database-name>',
'table.whitelist' = 'messages',
'timestamp.column.name' = 'created_at',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
This solution builds up a query that can take a stream of messages and assemble them into a thread of conversations for analysis.
If you look at those messages
, it’s clear that there are a lot of hellos
bouncing around, but beyond that it’s hard to see any patterns. Let’s
use some queries to make sense of it. We’ll build up our answer to
"Who’s connected to who?" gradually. Before we begin, here are some session
settings to make sure that we all get the same results:
SET 'auto.offset.reset' = 'earliest';
SET 'ksql.query.pull.table.scan.enabled' = 'true';
The first step is to break the stream up into individual conversations. If we sort the sender and receiver of each message, we can create a unique ID for every pair that chats (or tries to start chatting), and use that to group all the events:
CREATE TABLE conversations_split AS
SELECT
ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
COLLECT_LIST(rowtime) AS message_times
FROM messages
GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>');
Querying that looks like this:
SELECT * FROM conversations_split;
+----------------+----------------------------------------------+
|CONVERSATION_ID |MESSAGE_TIMES |
+----------------+----------------------------------------------+
|3<>4 |[1637318173288, 1637317383974, 1637318170245] |
|4<>5 |[1637317384066, 1637317384210, 1637317384126] |
|1<>2 |[1637317383692, 1637317383832, 1637317383887] |
|1<>3 |[1637317383778] |
NOTE:
Because we sorted the [send_id, recv_id]
array, it doesn’t matter if 1
was sending to 2 or 2 was sending to 1—we get the same conversation ID for both
directions.
That’s a big help—we can analyze conversations individually.
Let’s put each of these conversations in time order, and keep track of the sender as we go.
We do this in two steps. First, we’ll enhance our message_times
column to
build up a map with the rowtime
as the key and the send_id
as the
value:
CREATE TABLE conversations_mapped AS
SELECT
ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
AS_MAP(
COLLECT_LIST(CAST(rowtime AS VARCHAR)),
COLLECT_LIST(send_id)
) AS message_times
FROM messages
GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>');
Querying that looks like this:
SELECT * FROM conversations_mapped;
+----------------+----------------------------------------------------+
|CONVERSATION_ID |MESSAGE_TIMES |
+----------------+----------------------------------------------------+
|3<>4 |{1637317383974=3, 1637318170245=3, 1637318173288=3} |
|4<>5 |{1637317384210=5, 1637317384066=5, 1637317384126=4} |
|1<>2 |{1637317383832=2, 1637317383887=1, 1637317383692=1} |
|1<>3 |{1637317383778=1} |
It’s almost right, but we want to be able to see those messages in
order. Let’s turn the message_times
map back into a sorted list with
ENTRIES(<map>, true)
:
CREATE TABLE conversations_sequenced AS
SELECT
ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
ENTRIES(
AS_MAP(
COLLECT_LIST(CAST(rowtime AS VARCHAR)),
COLLECT_LIST(send_id)
),
true
) AS message_times
FROM messages
GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>');
Querying that looks like this:
SELECT * FROM conversations_sequenced;
+----------------+-------------------------------------------------------------------------+
|CONVERSATION_ID |MESSAGE_TIMES |
+----------------+-------------------------------------------------------------------------+
|3<>4 |[{K=1637317383974, V=3}, {K=1637318170245, V=3}, {K=1637318173288, V=3}] |
|4<>5 |[{K=1637317384066, V=5}, {K=1637317384126, V=4}, {K=1637317384210, V=5}] |
|1<>2 |[{K=1637317383692, V=1}, {K=1637317383832, V=2}, {K=1637317383887, V=1}] |
|1<>3 |[{K=1637317383778, V=1}] |
Perfect. If you pause and take a look at the 4<>5
row, you’ll see we
nearly have our answer. First 5 sends a message, then 4 replies, then
5 follows up. That’s a match! 1<>2
also matches, and it looks like
3 is getting nowhere with 4.
If our data sets were tiny, we’d be done—we can see by eye which
conversations match. To scale this up, let’s teach ksqlDB to step
through that sorted array of message_times
and track the steps of the
conversation flowing back and forth. We can do with the REDUCE
function.
REDUCE
is a way of stepping through an array,
entry by entry, and boiling it down to a final result. We give it the
array (in our case, message_times
), a starting state and a function
that can take our state and one element of the array, and give us the
next state.
Our state will track the steps in the flow and who sent the most recent message. We’ll start with these placeholder values:
STRUCT(step := 'start', sender := CAST(-1 AS BIGINT))
And then build up the function (old_state, element) ⇒ …
, which
considers each possible case:
If we’re at the start
step, the next message is always an
opener. Move to opened
.
If we’re at opened
, and the message has a new send_id
, then the
sender has changed and that’s a reply. Move to replied
.
If we’re at replied
, and the message has changed send_id
again,
that’s a connection! Move to connected
.
In any other case, there’s no change.
In code, that looks like this:
CREATE OR REPLACE TABLE conversation_states AS
SELECT
conversation_id,
REDUCE(
message_times,
STRUCT(step := 'start', last_sender := CAST(-1 AS BIGINT)),
(old_state, element) => CASE
WHEN old_state->step = 'start'
THEN struct(step := 'opened', last_sender := element->v)
WHEN old_state->step = 'opened' AND old_state->last_sender != element->v
THEN struct(step := 'replied', last_sender := element->v)
WHEN old_state->step = 'replied' AND old_state->last_sender != element->v
THEN struct(step := 'connected', last_sender := element->v)
ELSE old_state
END
) as state
FROM conversations_sequenced;
Querying that looks like this:
SELECT * FROM conversation_states;
+----------------+--------------------------------+
|CONVERSATION_ID |STATE |
+----------------+--------------------------------+
|3<>4 |{STEP=opened, LAST_SENDER=3} |
|4<>5 |{STEP=connected, LAST_SENDER=5} |
|1<>2 |{STEP=connected, LAST_SENDER=1} |
|1<>3 |{STEP=opened, LAST_SENDER=1} |
To wrap up, let’s just trim that down to the final answers:
SELECT conversation_id
FROM conversation_states
WHERE state->step = 'connected';
+----------------+
|CONVERSATION_ID |
+----------------+
|4<>5 |
|1<>2 |
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;