Match users for online dating

Edit this page
When it comes to online dating, matching users based on mutual interests and their personal preferences, while enabling real-time communication are key to finding the right counterpart. This tutorial enables developers to dynamically determine which pairs of people have connected and are ripe to get the ball rolling.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

The application breaks up the stream of messages into individual conversations and puts each of those conversations in time order, keeping track of the sender as we go. Then it builds up the function (old_state, element) ⇒ …​, which considers different states.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM messages (
  send_id BIGINT,
  recv_id BIGINT,
  message VARCHAR
) WITH (
  KAFKA_TOPIC = 'messages',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE TABLE conversations AS
SELECT
  ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
  AS_VALUE(ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>')) AS conversation_value
FROM messages
GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>')
HAVING
  REDUCE(
    ENTRIES(
        AS_MAP(
          COLLECT_LIST(CAST(rowtime AS VARCHAR)),
          COLLECT_LIST(send_id)
        ),
        true
    ),
    STRUCT(step := 'start', last_sender := CAST(-1 AS BIGINT)),
    (old_state, element) => CASE
      WHEN old_state->step = 'start'
        THEN struct(step := 'opened', last_sender := element->v)
      WHEN old_state->step = 'opened' AND old_state->last_sender != element->v
        THEN struct(step := 'replied', last_sender := element->v)
      WHEN old_state->step = 'replied' AND old_state->last_sender != element->v
        THEN struct(step := 'connected', last_sender := element->v)
      ELSE old_state
    END
  )->step = 'connected'
EMIT CHANGES;

-- Add the INSERT INTO commands to insert mock data

INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 2, 'Hello' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 3, 'Hello' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 2, 1, 'Hey there' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 1, 2, 'What''s going on?' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'Hi' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'Hello' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 5, 4, 'Hi' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 4, 5, 'Well hi to you too.' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 3, 4, 'I''d like to chat.' );
INSERT INTO messages ( send_id, recv_id, message ) VALUES ( 5, 4, 'Would you like to discuss event streaming with me?' );

To validate that this recipe is working, run the following query:

SELECT * FROM conversations EMIT CHANGES LIMIT 2;

Your output should resemble:

+------------------------------------------------------------------------+------------------------------------------------------------------------+
|CONVERSATION_ID                                                         |CONVERSATION_VALUE                                                      |
+------------------------------------------------------------------------+------------------------------------------------------------------------+
|4<>5                                                                    |4<>5                                                                    |
|1<>2                                                                    |1<>2                                                                    |
Limit Reached
Query terminated

Test with real data

3

If you have real end systems to connect to, adapt the sample connector configuration(s) below and run them from ksqlDB with CREATE SOURCE CONNECTOR.

-- Example
CREATE SOURCE CONNECTOR IF NOT EXISTS online_dating WITH (
  'connector.class'          = 'PostgresSource',
  'name'                     = 'recipe-postgres-dating',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '<database-port>',
  'connection.user'          = '<database-user>',
  'connection.password'      = '<database-password>',
  'database'                 = '<database-name>',
  'table.whitelist'          = 'messages',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

Explanation

4

This solution builds up a query that can take a stream of messages and assemble them into a thread of conversations for analysis.

Tracking connections

If you look at those messages, it’s clear that there are a lot of hellos bouncing around, but beyond that it’s hard to see any patterns. Let’s use some queries to make sense of it. We’ll build up our answer to "Who’s connected to who?" gradually. Before we begin, here are some session settings to make sure that we all get the same results:

SET 'auto.offset.reset' = 'earliest';
SET 'ksql.query.pull.table.scan.enabled' = 'true';

Split By conversation

The first step is to break the stream up into individual conversations. If we sort the sender and receiver of each message, we can create a unique ID for every pair that chats (or tries to start chatting), and use that to group all the events:

CREATE TABLE conversations_split AS
  SELECT
    ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
    COLLECT_LIST(rowtime) AS message_times
  FROM messages
  GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>');

Querying that looks like this:

SELECT * FROM conversations_split;
+----------------+----------------------------------------------+
|CONVERSATION_ID |MESSAGE_TIMES                                 |
+----------------+----------------------------------------------+
|3<>4            |[1637318173288, 1637317383974, 1637318170245] |
|4<>5            |[1637317384066, 1637317384210, 1637317384126] |
|1<>2            |[1637317383692, 1637317383832, 1637317383887] |
|1<>3            |[1637317383778]                               |

NOTE: Because we sorted the [send_id, recv_id] array, it doesn’t matter if 1 was sending to 2 or 2 was sending to 1—we get the same conversation ID for both directions.

Chat By chat

That’s a big help—we can analyze conversations individually.

Let’s put each of these conversations in time order, and keep track of the sender as we go.

We do this in two steps. First, we’ll enhance our message_times column to build up a map with the rowtime as the key and the send_id as the value:

CREATE TABLE conversations_mapped AS
  SELECT
    ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
    AS_MAP(
      COLLECT_LIST(CAST(rowtime AS VARCHAR)),
      COLLECT_LIST(send_id)
    ) AS message_times
  FROM messages
  GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>');

Querying that looks like this:

SELECT * FROM conversations_mapped;
+----------------+----------------------------------------------------+
|CONVERSATION_ID |MESSAGE_TIMES                                       |
+----------------+----------------------------------------------------+
|3<>4            |{1637317383974=3, 1637318170245=3, 1637318173288=3} |
|4<>5            |{1637317384210=5, 1637317384066=5, 1637317384126=4} |
|1<>2            |{1637317383832=2, 1637317383887=1, 1637317383692=1} |
|1<>3            |{1637317383778=1}                                   |

It’s almost right, but we want to be able to see those messages in order. Let’s turn the message_times map back into a sorted list with ENTRIES(<map>, true):

CREATE TABLE conversations_sequenced AS
  SELECT
    ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>') AS conversation_id,
    ENTRIES(
        AS_MAP(
          COLLECT_LIST(CAST(rowtime AS VARCHAR)),
          COLLECT_LIST(send_id)
        ),
        true
    ) AS message_times
  FROM messages
  GROUP BY ARRAY_JOIN(ARRAY_SORT(ARRAY [send_id, recv_id]), '<>');

Querying that looks like this:

SELECT * FROM conversations_sequenced;
+----------------+-------------------------------------------------------------------------+
|CONVERSATION_ID |MESSAGE_TIMES                                                            |
+----------------+-------------------------------------------------------------------------+
|3<>4            |[{K=1637317383974, V=3}, {K=1637318170245, V=3}, {K=1637318173288, V=3}] |
|4<>5            |[{K=1637317384066, V=5}, {K=1637317384126, V=4}, {K=1637317384210, V=5}] |
|1<>2            |[{K=1637317383692, V=1}, {K=1637317383832, V=2}, {K=1637317383887, V=1}] |
|1<>3            |[{K=1637317383778, V=1}]                                                 |

Perfect. If you pause and take a look at the 4<>5 row, you’ll see we nearly have our answer. First 5 sends a message, then 4 replies, then 5 follows up. That’s a match! 1<>2 also matches, and it looks like 3 is getting nowhere with 4.

Stepping through conversations automatically

If our data sets were tiny, we’d be done—we can see by eye which conversations match. To scale this up, let’s teach ksqlDB to step through that sorted array of message_times and track the steps of the conversation flowing back and forth. We can do with the REDUCE function.

REDUCE is a way of stepping through an array, entry by entry, and boiling it down to a final result. We give it the array (in our case, message_times), a starting state and a function that can take our state and one element of the array, and give us the next state.

Our state will track the steps in the flow and who sent the most recent message. We’ll start with these placeholder values:

STRUCT(step := 'start', sender := CAST(-1 AS BIGINT))

And then build up the function (old_state, element) ⇒ …​, which considers each possible case:

  • If we’re at the start step, the next message is always an opener. Move to opened.

  • If we’re at opened, and the message has a new send_id, then the sender has changed and that’s a reply. Move to replied.

  • If we’re at replied, and the message has changed send_id again, that’s a connection! Move to connected.

  • In any other case, there’s no change.

In code, that looks like this:

CREATE OR REPLACE TABLE conversation_states AS
  SELECT
    conversation_id,
    REDUCE(
      message_times,
      STRUCT(step := 'start', last_sender := CAST(-1 AS BIGINT)),
      (old_state, element) => CASE
        WHEN old_state->step = 'start'
          THEN struct(step := 'opened', last_sender := element->v)
        WHEN old_state->step = 'opened' AND old_state->last_sender != element->v
          THEN struct(step := 'replied', last_sender := element->v)
        WHEN old_state->step = 'replied' AND old_state->last_sender != element->v
          THEN struct(step := 'connected', last_sender := element->v)
        ELSE old_state
      END
    ) as state
  FROM conversations_sequenced;

Querying that looks like this:

SELECT * FROM conversation_states;
+----------------+--------------------------------+
|CONVERSATION_ID |STATE                           |
+----------------+--------------------------------+
|3<>4            |{STEP=opened, LAST_SENDER=3}    |
|4<>5            |{STEP=connected, LAST_SENDER=5} |
|1<>2            |{STEP=connected, LAST_SENDER=1} |
|1<>3            |{STEP=opened, LAST_SENDER=1}    |

Final answer

To wrap up, let’s just trim that down to the final answers:

SELECT conversation_id
FROM conversation_states
WHERE state->step = 'connected';
+----------------+
|CONVERSATION_ID |
+----------------+
|4<>5            |
|1<>2            |

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;