Get Started Free
‹ Back to courses
course: Designing Events and Event Streams

Hands On: Single vs. Multiple Event Streams

2 min
bellemare-headshot-zoomed

Adam Bellemare

Staff Technologist, Office of the CTO (Presenter)

In this exercise, you will build up the current shopping cart for a series of add and remove item events. Since ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation, you will merge them together into one stream. You will then use ksqlDB to aggregate the events together into the current state of the shopping cart. During the exercise, you will see items added and removed from the cart in real time, as well as witness how the aggregate functions work.

Build an Aggregate of the Required Actions

Let’s start by creating two event streams.

  1. Open URL https://confluent.cloud and log in to the Confluent Cloud console.

  2. Navigate to the default environment, the event-streams cluster, and the Editor for the event-streams-ksqDB cluster.

  3. Create the item_added stream:

    CREATE STREAM item_added (
      cart_id BIGINT key,
      item_id BIGINT
    ) WITH (
      KAFKA_TOPIC = 'item_added',
      VALUE_FORMAT = 'AVRO',
      PARTITIONS = 6);

    Note: If you already completed the first exercise for this course, this stream will already exist and the command will return an error message indicating this.

  4. Create the item_removed stream:

    CREATE STREAM item_removed (
      cart_id BIGINT key,
      item_id BIGINT
    ) WITH (
      KAFKA_TOPIC = 'item_removed',
      VALUE_FORMAT = 'AVRO',
      PARTITIONS = 6);

Next, create a merged_cart_actions stream. As previously mentioned, ksqlDB does not support creating aggregates from multiple event streams, so you have to merge the ADD and REMOVE events into a single event stream, and refactor their schemas slightly.

  1. Create the merged_cart_actions stream:

    CREATE STREAM merged_cart_actions (
      cart_id BIGINT key,
      item_id BIGINT,
      action STRING
    ) WITH (
      KAFKA_TOPIC = 'merged_cart_actions',
      VALUE_FORMAT = 'AVRO',
      PARTITIONS = 6);
  2. Merge the events from the item_added into the new merged_cart_actions stream using a simple streaming processor for each ADD.

    INSERT INTO merged_cart_actions
    SELECT cart_id, item_id, 'ADD' AS action FROM item_added;
  3. Merge the events from the item_removed into the merged_cart_actions stream using a simple streaming processor for each REMOVE.

    INSERT INTO merged_cart_actions
    SELECT cart_id, item_id, 'REMOVE' AS action FROM item_removed;
  4. In this step you create a table of (cart, item) tuples, and aggregate up the ADD and REMOVE actions into a list.

    CREATE TABLE item_cart_actions
      WITH (KEY_FORMAT='JSON', VALUE_FORMAT='AVRO')
      AS SELECT cart_id, item_id, collect_list(action) AS actions 
      FROM merged_cart_actions 
        GROUP BY (cart_id, item_id) 
        EMIT CHANGES;
  5. Finish it up by reducing the list down to a numeric quantity, while grouping the items and the final quantity of those items together into a single STRUCT. Your result is a single row that represents the current cart_id, and every item and quantity within the cart.

    CREATE TABLE shopping_cart AS 
    SELECT cart_id,
           collect_list( STRUCT(
             item_id := item_id,
             quantity := REDUCE ( actions, 0, (s, x) => case
               when x = 'ADD' then s + 1
               when x = 'REMOVE' then s - 1 end )))
           as basket
    FROM item_cart_actions 
    GROUP BY cart_id
    EMIT CHANGES;
  6. Now test the result by inserting sample data into the tables.

    INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
    INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
    INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
    INSERT INTO item_removed (cart_id, item_id) VALUES (1234, 200);
    INSERT INTO item_added (cart_id, item_id) VALUES (1234, 400);
  7. In this last step, query the shopping_cart and view the results (make sure you start from EARLIEST).

    SELECT * FROM shopping_cart EMIT CHANGES;

    Note: The query result may also include shopping carts 200, 201, and 202 that were added in the previous Hands On: Modeling as Facts vs. Delta Event Types exercise.

emit-changes

In this exercise, you merged two separate streams into a single schema, before then processing it with a ksqlDB processor.

ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation. To support multiple types inside of a single event stream, you need to look at Kafka Streams usage. For more info on this, see Bill Bejeck’s related presentation from Kafka Summit Europe 2021 on this subject. He also has a GitHub repository that contains examples from the presentation.

Exercise Environment Teardown

After completing the course exercises, you need to tear down the designing-events environment to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.

  1. In the Confluent Cloud console, navigate to Environments.

environments-delete

  1. Click the Delete button for the designing-events environment.
  2. Confirm the delete request and click Continue.

Use the promo code EVENTDESIGN101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Hands On: Single vs. Multiple Event Streams

Hi, I'm Adam from Confluent. In this module we're going to practice using Single Event Streams and Multiple Event Streams. In this exercise, you will build up the current shopping cart for a series of add and remove item events. Since ksqlDB doesn't support multiple schema definitions as inputs to a single operation, you will be merging them together into one stream. You will then use ksqlDB to aggregate the events together into the current state of the shopping cart. During the exercise, you will see items added and removed from the cart in real time, as well as witness how the aggregate function works. First, you'll create two streams, one to represent adding an item to the shopping cart and one to represent removing an item from the shopping cart. Next, you'll create a merged cart action stream. ksqlDB does not support creating aggregates from multiple event streams, so we're going to merge the add and the remove events into a single event stream and refactor their schema slightly. You will then merge the events into the merged cart actions by creating a simple streaming processor for each of the add and remove. In the next step, you will create a table of cart and item tuples and aggregate the adds and the remove actions into a list. You'll finish up by reducing the list down to a numeric quantity, while grouping the items and the final quantity together into a single struct. Together you get a single row representing the current cart ID and every item and quantity within the cart. And now you've successfully completed this exercise. We've taken add events and remove events and aggregated them together to build up the current state of the shopping cart.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.