Staff Technologist, Office of the CTO (Presenter)
In this exercise, you will build up the current shopping cart for a series of add and remove item events. Since ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation, you will merge them together into one stream. You will then use ksqlDB to aggregate the events together into the current state of the shopping cart. During the exercise, you will see items added and removed from the cart in real time, as well as witness how the aggregate functions work.
Build an Aggregate of the Required Actions
Let’s start by creating two event streams.
Open URL https://confluent.cloud and log in to the Confluent Cloud console.
Navigate to the default environment, the event-streams cluster, and the Editor for the event-streams-ksqDB cluster.
Create the item_added
stream:
CREATE STREAM item_added (
cart_id BIGINT key,
item_id BIGINT
) WITH (
KAFKA_TOPIC = 'item_added',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6);
Note: If you already completed the first exercise for this course, this stream will already exist and the command will return an error message indicating this.
Create the item_removed
stream:
CREATE STREAM item_removed (
cart_id BIGINT key,
item_id BIGINT
) WITH (
KAFKA_TOPIC = 'item_removed',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6);
Next, create a merged_cart_actions
stream. As previously mentioned, ksqlDB does not support creating aggregates from multiple event streams, so you have to merge the ADD and REMOVE events into a single event stream, and refactor their schemas slightly.
Create the merged_cart_actions
stream:
CREATE STREAM merged_cart_actions (
cart_id BIGINT key,
item_id BIGINT,
action STRING
) WITH (
KAFKA_TOPIC = 'merged_cart_actions',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6);
Merge the events from the item_added
into the new merged_cart_actions
stream using a simple streaming processor for each ADD
.
INSERT INTO merged_cart_actions
SELECT cart_id, item_id, 'ADD' AS action FROM item_added;
Merge the events from the item_removed
into the merged_cart_actions
stream using a simple streaming processor for each REMOVE
.
INSERT INTO merged_cart_actions
SELECT cart_id, item_id, 'REMOVE' AS action FROM item_removed;
In this step you create a table of (cart, item)
tuples, and aggregate up the ADD
and REMOVE
actions into a list.
CREATE TABLE item_cart_actions
WITH (KEY_FORMAT='JSON', VALUE_FORMAT='AVRO')
AS SELECT cart_id, item_id, collect_list(action) AS actions
FROM merged_cart_actions
GROUP BY (cart_id, item_id)
EMIT CHANGES;
Finish it up by reducing the list down to a numeric quantity, while grouping the items and the final quantity of those items together into a single STRUCT. Your result is a single row that represents the current cart_id
, and every item and quantity within the cart.
CREATE TABLE shopping_cart AS
SELECT cart_id,
collect_list( STRUCT(
item_id := item_id,
quantity := REDUCE ( actions, 0, (s, x) => case
when x = 'ADD' then s + 1
when x = 'REMOVE' then s - 1 end )))
as basket
FROM item_cart_actions
GROUP BY cart_id
EMIT CHANGES;
Now test the result by inserting sample data into the tables.
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_removed (cart_id, item_id) VALUES (1234, 200);
INSERT INTO item_added (cart_id, item_id) VALUES (1234, 400);
In this last step, query the shopping_cart
and view the results (make sure you start from EARLIEST).
SELECT * FROM shopping_cart EMIT CHANGES;
Note: The query result may also include shopping carts 200, 201, and 202 that were added in the previous Hands On: Modeling as Facts vs. Delta Event Types exercise.
In this exercise, you merged two separate streams into a single schema, before then processing it with a ksqlDB processor.
ksqlDB doesn’t support multiple independent schema definitions as inputs to a single operation. To support multiple types inside of a single event stream, you need to look at Kafka Streams usage. For more info on this, see Bill Bejeck’s related presentation from Kafka Summit Europe 2021 on this subject. He also has a GitHub repository that contains examples from the presentation.
Exercise Environment Teardown
After completing the course exercises, you need to tear down the designing-events environment to avoid unnecessarily accruing cost to the point your promotional credits are exhausted.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.