Get Started Free
course: Designing Events and Event Streams

Hands On: Single vs. Multiple Event Streams

2 min
bellemare-headshot-zoomed

Adam Bellemare

Staff Technologist, Office of the CTO (Presenter)

In this exercise, you will build up the current shopping cart for a series of add and remove item events. Since ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation, you will merge them together into one stream. You will then use ksqlDB to aggregate the events together into the current state of the shopping cart. During the exercise, you will see items added and removed from the cart in real time, as well as witness how the aggregate functions work.

Build an Aggregate of the Required Actions

Let’s start by creating two event streams.

  1. Open URL https://confluent.cloud and log in to the Confluent Cloud console.
  2. Navigate to the default environment, the event-streams cluster, and the Editor for the event-streams-ksqDB cluster.
  3. Create the item_added stream:

Note: If you already completed the first exercise for this course, this stream will already exist and the command will return an error message indicating this.

CREATE STREAM item_added (
  cart_id BIGINT key,
  item_id BIGINT
) WITH (
  KAFKA_TOPIC = 'item_added',
  VALUE_FORMAT = 'AVRO',
  PARTITIONS = 6);
  1. Create the item_removed stream:
CREATE STREAM item_removed (
  cart_id BIGINT key,
  item_id BIGINT
) WITH (
  KAFKA_TOPIC = 'item_removed',
  VALUE_FORMAT = 'AVRO',
  PARTITIONS = 6);

Next, create a merged_cart_actions stream. As previously mentioned, ksqlDB does not support creating aggregates from multiple event streams, so you have to merge the ADD and REMOVE events into a single event stream, and refactor their schemas slightly.

  1. Create the merged_cart_actions stream:
CREATE STREAM merged_cart_actions (
  cart_id BIGINT key,
  item_id BIGINT,
  action STRING
) WITH (
  KAFKA_TOPIC = 'merged_cart_actions',
  VALUE_FORMAT = 'AVRO',
  PARTITIONS = 6);
  1. Merge the events from the item_added into the new merged_cart_actions stream using a simple streaming processor for each ADD.
INSERT INTO merged_cart_actions
SELECT cart_id, item_id, 'ADD' AS action FROM item_added;
  1. Merge the events from the item_removed into the merged_cart_actions stream using a simple streaming processor for each REMOVE.
INSERT INTO merged_cart_actions
SELECT cart_id, item_id, 'REMOVE' AS action FROM item_removed;
  1. In this step you create a table of (cart, item) tuples, and aggregate up the ADD and REMOVE actions into a list.
CREATE TABLE item_cart_actions
  WITH (KEY_FORMAT='JSON', VALUE_FORMAT='AVRO')
  AS SELECT cart_id, item_id, collect_list(action) AS actions 
  FROM merged_cart_actions 
    GROUP BY (cart_id, item_id) 
    EMIT CHANGES;
  1. Finish it up by reducing the list down to a numeric quantity, while grouping the items and the final quantity of those items together into a single STRUCT. Your result is a single row that represents the current cart_id, and every item and quantity within the cart.
CREATE TABLE shopping_cart AS 
SELECT cart_id,
       collect_list( STRUCT(
         item_id := item_id,
         quantity := REDUCE ( actions, 0, (s, x) => case
           when x = 'ADD' then s + 1
           when x = 'REMOVE' then s - 1 end )))
       as basket
FROM item_cart_actions 
GROUP BY cart_id
EMIT CHANGES;
  1. Now test the result by inserting sample data into the tables.
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_removed (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 400);
  1. In this last step, query the shopping_cart and view the results (make sure you start from EARLIEST).
SELECT * FROM shopping_cart EMIT CHANGES;

emit-changes

Note: The query result may also include shopping carts 200, 201, and 202 that were added in the previous Hands On: Modeling as Facts vs. Delta Event Types exercise.

In this exercise, you merged two separate streams into a single schema, before then processing it with a ksqlDB processor.

ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation. To support multiple types inside of a single event stream, you need to look at Kafka Streams usage. For more info on this, see Bill Bejeck’s related presentation from Kafka Summit Europe 2021 on this subject. He also has a GitHub repository that contains examples from the presentation.

Exercise Environment Teardown

After completing the course exercises, you need to tear down the designing-events environment to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.

  1. In the Confluent Cloud console, navigate to Environments.

environments-delete

  1. Click the Delete button for the designing-events environment.
  2. Confirm the delete request and click Continue.

Use the promo code EVENTDESIGN101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.