Get Started Free
course: Designing Events and Event Streams

Hands On: Modeling as Facts vs. Delta Event Types

2 min
bellemare-headshot-zoomed

Adam Bellemare

Staff Technologist, Office of the CTO (Presenter)

This exercise explores use cases for both fact and delta events.

Note: If you haven’t already signed up for Confluent Cloud, you need to do so before continuing with this exercise. You can do so by completing the steps that immediately precede this exercise. These steps include creating the event-streams cluster used by the course exercises.

Create and Use a ksqlDB Table of Item Facts

In this first part of the exercise, you will materialize a stream of item fact events into a table. Each new item fact will be inserted into the table as it is consumed, overwriting any previous facts that share the same primary key. Facts provide an excellent way to transfer state from one service to another.

Let’s start with logging in to the Confluent Cloud UI.

  1. Open URL https://confluent.cloud and log in to the Confluent Cloud console.
  2. Navigate to the environments page and click Add cloud environment.

add-cloud-environment

  1. Name the new environment designing-events and click Create.

new-environment Next, you will be prompted to enable one of the available Streams Governance Packages. You need to do so since the course exercises utilize Schema Registry which these packages include.

  1. Click Begin configuration for the Essentials package.

essentials-package

  1. Select a provider and region and click Enable.

Next, create a cluster in the designing-events environment.

  1. Click Create cluster on my own.

designing-events

  1. Click Begin configuration for the Basic cluster.
  2. Select a provider and region and click Continue.
  3. Assign a name of event-streams and click Launch cluster.

You also need to enable ksqlDB for your Confluent Cloud cluster. Let’s do this now.

  1. On the left side of the UI, click ksqlDB.

ksqIDB

  1. Click Create cluster myself.
  2. Select Global access since that is fine for our exercise environment and click Continue.
  3. Assign a cluster name of event-stream-ksqlDB.
  4. Accept all remaining defaults and click Launch cluster.

Proceed to the next step when the ksqlDB cluster provisioning has completed. Provisioning usually takes less than fifteen minutes, but in some rare cases it may take a bit longer.

Note: The ksqlDB cluster will slowly deplete your promotional credits so you want to complete the environment cleanup steps at the end of the last exercise to limit this impact.

Now that your ksqlDB cluster is available, let’s create a TABLE.

  1. In the Confluent Cloud UI, navigate to ksqlDB and click the event-stream-ksqlDB cluster.
  2. Set auto.offset.reset to Earliest for this session.

event-stream-ksqidb

  1. Enter the following ksqlDB in the Editor and click Run query.

    CREATE TABLE items (
      id BIGINT PRIMARY KEY,
      price DECIMAL(10, 2),
      name STRING,
      description STRING,
      brand_id BIGINT,
      tax_status_id BIGINT
    ) WITH (
      KAFKA_TOPIC = 'items',
      VALUE_FORMAT = 'AVRO',
      PARTITIONS = 6
    );
  2. Now insert a row of data into the stream backing the table.

    INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
    VALUES (1, 9.99, 'Baseball Trading Cards', 'Premium Ol Slugger baseball trading cards!', 401, 778);
  3. Insert a few more rows of data.

    INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
    VALUES (2, .99, 'Football Trading Cards', 'Premium NFL 2022 football trading cards!', 402, 778);
    INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
    VALUES (3, 19.99, 'Hockey Trading Cards', 'Premium NHL hockey trading cards!', 403, 778);
    INSERT INTO items (id, price, name, description, brand_id, tax_status_id)
    VALUES (4, 49.99, 'Basketball Trading Cards', 'Premium NBA basketball trading cards!', 404, 778);
  4. Next, create a KTABLE from the topic to get the latest data for each item.

    CREATE TABLE all_items WITH (
      KAFKA_TOPIC = 'ksqdb_table_all_items', 
      PARTITIONS = 6, 
      REPLICAS = 3
    ) AS 
    SELECT *
    FROM items
    WHERE true
    EMIT CHANGES;
  5. Run the following query to verify the contents of the all_items table:

    SELECT * from all_items EMIT CHANGES;

The query result should include these four items.

query-four-items

Now let’s verify that updating the price for an item results in a corresponding update to the all_items table.

  1. Run the following query to update the price of the baseball trading cards:

    INSERT INTO Items (id, price, name, description, brand_id, tax_status_id)
    VALUES (1, 14.99, 'Baseball Trading Cards', 'Premium Ol Slugger baseball trading cards!', 401, 778);
  2. Run the following query to verify the all_items table reflects the updated price:

    SELECT * from all_items EMIT CHANGES;

The row for the baseball trading cards should include "PRICE":14.99.

price

Create and use a ksqlDB STREAM of item_added delta events

The second part of this exercise covers delta events. You will produce item_added events to a single event stream. You will then consume those events to create a single aggregated state of the shopping cart, showcasing the current state of the items inside the cart.

Note: A later exercise will cover a bit more complicated use case that includes both item_added and item_removed events.

  1. To define the schema and stream, run the following query:

    CREATE STREAM item_added (
      cart_id BIGINT key,
      item_id BIGINT
    ) WITH (
      KAFKA_TOPIC = 'item_added',
      VALUE_FORMAT = 'AVRO',
      PARTITIONS = 6
    );
  2. Run the following query to create a KTABLE from the item_added stream to get the latest data for each cart:

    CREATE TABLE items_per_cart AS 
    SELECT cart_id, COUNT(*) as items_in_cart
    FROM item_added
    GROUP BY cart_id
    EMIT CHANGES;
  3. Insert data into the item_added stream to add items to several carts:

    INSERT INTO item_added (cart_id, item_id) VALUES (200, 1);
    INSERT INTO item_added (cart_id, item_id) VALUES (201, 4);
    INSERT INTO item_added (cart_id, item_id) VALUES (202, 3);
    INSERT INTO item_added (cart_id, item_id) VALUES (201, 2);
    INSERT INTO item_added (cart_id, item_id) VALUES (200, 4);
  4. Run the following query to see how many items are currently in each cart:

    SELECT * FROM items_per_cart EMIT CHANGES;

Use the promo code EVENTDESIGN101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.