Staff Technologist, Office of the CTO (Presenter)
In this exercise, you will build up the current shopping cart for a series of add and remove item events. Since ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation, you will merge them together into one stream. You will then use ksqlDB to aggregate the events together into the current state of the shopping cart. During the exercise, you will see items added and removed from the cart in real time, as well as witness how the aggregate functions work.
Build an Aggregate of the Required Actions
Let’s start by creating two event streams.
Open URL https://confluent.cloud and log in to the Confluent Cloud console.
Navigate to the default environment, the event-streams cluster, and the Editor for the event-streams-ksqDB cluster.
Create the item_added stream:
CREATE STREAM item_added (
cart_id BIGINT key,
item_id BIGINT
) WITH (
KAFKA_TOPIC = 'item_added',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6);
Note: If you already completed the first exercise for this course, this stream will already exist and the command will return an error message indicating this.
Create the item_removed stream:
CREATE STREAM item_removed (
cart_id BIGINT key,
item_id BIGINT
) WITH (
KAFKA_TOPIC = 'item_removed',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6);
Next, create a merged_cart_actions stream. As previously mentioned, ksqlDB does not support creating aggregates from multiple event streams, so you have to merge the ADD and REMOVE events into a single event stream, and refactor their schemas slightly.
Create the merged_cart_actions stream:
CREATE STREAM merged_cart_actions (
cart_id BIGINT key,
item_id BIGINT,
action STRING
) WITH (
KAFKA_TOPIC = 'merged_cart_actions',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6);
Merge the events from the item_added into the new merged_cart_actions stream using a simple streaming processor for each ADD.
INSERT INTO merged_cart_actions
SELECT cart_id, item_id, 'ADD' AS action FROM item_added;
Merge the events from the item_removed into the merged_cart_actions stream using a simple streaming processor for each REMOVE.
INSERT INTO merged_cart_actions
SELECT cart_id, item_id, 'REMOVE' AS action FROM item_removed;
In this step you create a table of (cart, item) tuples, and aggregate up the ADD and REMOVE actions into a list.
CREATE TABLE item_cart_actions
WITH (KEY_FORMAT='JSON', VALUE_FORMAT='AVRO')
AS SELECT cart_id, item_id, collect_list(action) AS actions
FROM merged_cart_actions
GROUP BY (cart_id, item_id)
EMIT CHANGES;
Finish it up by reducing the list down to a numeric quantity, while grouping the items and the final quantity of those items together into a single STRUCT. Your result is a single row that represents the current cart_id, and every item and quantity within the cart.
CREATE TABLE shopping_cart AS
SELECT cart_id,
collect_list( STRUCT(
item_id := item_id,
quantity := REDUCE ( actions, 0, (s, x) => case
when x = 'ADD' then s + 1
when x = 'REMOVE' then s - 1 end )))
as basket
FROM item_cart_actions
GROUP BY cart_id
EMIT CHANGES;
Now test the result by inserting sample data into the tables.
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_removed (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 400);
In this last step, query the shopping_cart and view the results (make sure you start from EARLIEST).
SELECT * FROM shopping_cart EMIT CHANGES;
Note: The query result may also include shopping carts 200, 201, and 202 that were added in the previous Hands On: Modeling as Facts vs. Delta Event Types exercise.
In this exercise, you merged two separate streams into a single schema, before then processing it with a ksqlDB processor.
ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation. To support multiple types inside of a single event stream, you need to look at Kafka Streams usage. For more info on this, see Bill Bejeck’s related presentation from Kafka Summit Europe 2021 on this subject. He also has a GitHub repository that contains examples from the presentation.
Exercise Environment Teardown
After completing the course exercises, you need to tear down the designing-events environment to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Adam from Confluent. In this module we're going to practice using Single Event Streams and Multiple Event Streams. In this exercise, you will build up the current shopping cart for a series of add and remove item events. Since ksqlDB doesn't support multiple schema definitions as inputs to a single operation, you will be merging them together into one stream. You will then use ksqlDB to aggregate the events together into the current state of the shopping cart. During the exercise, you will see items added and removed from the cart in real time, as well as witness how the aggregate function works. First, you'll create two streams, one to represent adding an item to the shopping cart and one to represent removing an item from the shopping cart. Next, you'll create a merged cart action stream. ksqlDB does not support creating aggregates from multiple event streams, so we're going to merge the add and the remove events into a single event stream and refactor their schema slightly. You will then merge the events into the merged cart actions by creating a simple streaming processor for each of the add and remove. In the next step, you will create a table of cart and item tuples and aggregate the adds and the remove actions into a list. You'll finish up by reducing the list down to a numeric quantity, while grouping the items and the final quantity together into a single struct. Together you get a single row representing the current cart ID and every item and quantity within the cart. And now you've successfully completed this exercise. We've taken add events and remove events and aggregated them together to build up the current state of the shopping cart.