Staff Technologist, Office of the CTO (Presenter)
This exercise explores use cases for both fact and delta events.
Note: If you haven’t already signed up for Confluent Cloud, you need to do so before continuing with this exercise. You can do so by completing the steps that immediately precede this exercise. These steps include creating the event-streams cluster used by the course exercises.
Create and Use a ksqlDB Table of Item Facts
In this first part of the exercise, you will materialize a stream of item fact events into a table. Each new item fact will be inserted into the table as it is consumed, overwriting any previous facts that share the same primary key. Facts provide an excellent way to transfer state from one service to another.
Let’s start with logging in to the Confluent Cloud UI.
Next, you will be prompted to enable one of the available Streams Governance Packages. You need to do so since the course exercises utilize Schema Registry which these packages include.
Next, create a cluster in the designing-events environment.
You also need to enable ksqlDB for your Confluent Cloud cluster. Let’s do this now.
Proceed to the next step when the ksqlDB cluster provisioning has completed. Provisioning usually takes less than fifteen minutes, but in some rare cases it may take a bit longer.
FREE ACCESS: Basic clusters used in the context of this exercise won't incur much cost, and the amount of free usage that you receive along with the promo code EVENTDESIGN101 for $25 of free Confluent Cloud usage will be more than enough to cover it. You can also use the promo code CONFLUENTDEV1 to delay entering a credit card for 30 days.
Note: The ksqlDB cluster will slowly deplete your promotional credits so you want to complete the environment cleanup steps at the end of the last exercise to limit this impact.
Now that your ksqlDB cluster is available, let’s create a TABLE.
Enter the following ksqlDB in the Editor and click Run query.
CREATE TABLE items (
id BIGINT PRIMARY KEY,
price DECIMAL(10, 2),
name STRING,
description STRING,
brand_id BIGINT,
tax_status_id BIGINT
) WITH (
KAFKA_TOPIC = 'items',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6
);
Now insert a row of data into the stream backing the table.
INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
VALUES (1, 9.99, 'Baseball Trading Cards', 'Premium Ol Slugger baseball trading cards!', 401, 778);
Insert a few more rows of data.
INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
VALUES (2, .99, 'Football Trading Cards', 'Premium NFL 2022 football trading cards!', 402, 778);
INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
VALUES (3, 19.99, 'Hockey Trading Cards', 'Premium NHL hockey trading cards!', 403, 778);
INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
VALUES (4, 49.99, 'Basketball Trading Cards', 'Premium NBA basketball trading cards!', 404, 778);
Next, create a KTABLE from the topic to get the latest data for each item.
CREATE TABLE all_items WITH (
KAFKA_TOPIC = 'ksqdb_table_all_items',
PARTITIONS = 6,
REPLICAS = 3
) AS
SELECT *
FROM items
WHERE true
EMIT CHANGES;
Run the following query to verify the contents of the all_items table:
SELECT * from all_items EMIT CHANGES;
The query result should include these four items.
Now let’s verify that updating the price for an item results in a corresponding update to the all_items table.
Run the following query to update the price of the baseball trading cards:
INSERT INTO Items (id, price, name, description, brand_id, tax_status_id)
VALUES (1, 14.99, 'Baseball Trading Cards', 'Premium Ol Slugger baseball trading cards!', 401, 778);
Run the following query to verify the all_items table reflects the updated price:
SELECT * from all_items EMIT CHANGES;
The row for the baseball trading cards should include "PRICE":14.99.
Create and use a ksqlDB STREAM of item_added delta events
The second part of this exercise covers delta events. You will produce item_added events to a single event stream. You will then consume those events to create a single aggregated state of the shopping cart, showcasing the current state of the items inside the cart.
Note: A later exercise will cover a bit more complicated use case that includes both item_added and item_removed events.
To define the schema and stream, run the following query:
CREATE STREAM item_added (
cart_id BIGINT key,
item_id BIGINT
) WITH (
KAFKA_TOPIC = 'item_added',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6
);
Run the following query to create a KTABLE from the item_added stream to get the latest data for each cart:
CREATE TABLE items_per_cart AS
SELECT cart_id, COUNT(*) as items_in_cart
FROM item_added
GROUP BY cart_id
EMIT CHANGES;
Insert data into the item_added stream to add items to several carts:
INSERT INTO item_added (cart_id, item_id) VALUES (200, 1);
INSERT INTO item_added (cart_id, item_id) VALUES (201, 4);
INSERT INTO item_added (cart_id, item_id) VALUES (202, 3);
INSERT INTO item_added (cart_id, item_id) VALUES (201, 2);
INSERT INTO item_added (cart_id, item_id) VALUES (200, 4);
Run the following query to see how many items are currently in each cart:
SELECT * FROM items_per_cart EMIT CHANGES;
Hi, I'm Adam from Confluent. In this module we're gonna get hands on with Facts and Delta Event Types. In this exercise, you will explore use cases for both Fact and Delta Events. First, you will need to enable the Schema Registry if this hasn't already been done for you in your Confluent Cloud Environment. Next, you will need to enable ksqlDB since you will be using it extensively during the exercise. After completing the setup steps, you will continue with the first part of the exercise during which you will materialize a stream of Item Fact Events into a table. Each new Item Fact will be inserted into the table as it is consumed, overriding any previous Facts that share the same primary key. Facts provide an excellent way to transfer state from one service to another. The second part of this exercise covers Delta Events. You will produce a single event type, representing adding an item to the cart. For now, we'll only add items to the cart. In a later exercise that covers a bit more of a complicated use case, we will both add and remove items from the cart. Now let's start by defining the schema and stream. Next, you will create a KTable from the stream that contains the Delta Events. As the exercise continues, you will insert data into the item_added stream and observe the resultant sums of the items in the items_per_cart table. We'll continue working on this in the dimension three hands on exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.