Principal Customer Success Technical Architect (Presenter)
Lead Technologist, Office of the CTO (Author)
Simulate event sourcing by completing this exercise, which uses ksqlDB on Confluent Cloud. You'll begin by creating a stream of e-commerce events backed by an Apache Kafka topic. Then you'll write a streaming query to transform them into a table in ksqlDB. Finally, you'll query your summary table to fetch the contents of the shopping cart back to your application.
Sign up for Confluent Cloud if you have not already done so, and respond to the verification email to confirm your account.
On Confluent Cloud, open the shopping_cart_database
that you set up in the module Thinking in Events (if you haven't set it up yet, please return there for instructions and a discount code). If you started your ksqlDB instance then, the status field should now say "up" and not "provisioning".
Open the query editor and set two additional parameters under Add query properties:
In the editor, create a stream to hold your shopping cart events with the following ksqlDB command:
CREATE STREAM shopping_cart_events (customer VARCHAR, item VARCHAR, qty INT)
WITH (kafka_topic='shopping_cart_events', value_format='json', partitions=1);
Select Run query and look for the output below the editor confirming that the event stream was created.
{
"@type": "currentStatus",
"statementText": "CREATE STREAM SHOPPING_CART_EVENTS (CUSTOMER STRING, ITEM STRING, QTY INTEGER) WITH (KAFKA_TOPIC='shopping_cart_events', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');",
"commandId": "stream/`SHOPPING_CART_EVENTS`/create",
"commandStatus": {
"status": "SUCCESS",
"message": "Stream created",
"queryId": null
},
"commandSequenceNumber": 2,
"warnings": []
}
Create four INSERT
statements to push events into the shopping_cart_events
topic:
--add two pairs of pants
INSERT INTO shopping_cart_events (customer, item, qty)
VALUES ('bob', 'pants', 2);
--add a t-shirt
INSERT INTO shopping_cart_events (customer, item, qty)
VALUES ('bob', 't-shirts', 1);
--remove one pair of pants
INSERT INTO shopping_cart_events (customer, item, qty)
VALUES ('bob', 'pants', -1);
--add a hat
INSERT INTO shopping_cart_events (customer, item, qty)
VALUES ('bob', 'hats', 1);
Select Run query.
Use a SELECT
statement to return your four events:
SELECT * FROM shopping_cart_events EMIT CHANGES;
Click Run query to see the results and then click Stop to terminate the push query.
Create a ksqlDB query to transform your events into a summary table by using a SUM
on the qty
field, with an associated GROUP BY
that aggregates the events by item:
CREATE TABLE current_shopping_cart WITH (KEY_FORMAT='JSON') AS
SELECT customer, item, SUM(qty) as total_qty
FROM shopping_cart_events
GROUP BY customer, item
EMIT CHANGES;
Click Run Query.
The result is a summary table that reflects how many items are in the shopping cart right now. The removals should cancel the items that were added.
Now run a query against the table just like you would with a regular database table:
SELECT * FROM current_shopping_cart EMIT CHANGES;
Click Run Query.
You should see three records, not four, i.e. one record per item. Specifically, you should see only one pair of pants, as the two events for pants have been summarized into one row.
Click Stop to terminate the push query.
Go to Cluster settings on the left-hand side menu, then click Delete cluster. Enter your cluster's name, then select Continue.
Delete your ksqlDB application by clicking ksqlDB on the left-hand side menu, then selecting Delete under Actions. Enter your application's name, then click Continue.
In this exercise, you used event sourcing to recreate the current state of a shopping cart from an unbounded event stream containing all user cart interactions.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.