Course: Event Sourcing and Event Storage with Apache Kafka®

Hands On: Trying Out Event Sourcing with Confluent Cloud

3 min
Anna McDonaldPrincipal Customer Success Technical Architect (Course Presenter)
Ben Stopford Lead Technologist (Course Author)

Hands On: Trying Out Event Sourcing with Confluent Cloud

Simulate event sourcing by completing this exercise, which uses ksqlDB on Confluent Cloud. You'll begin by creating a stream of e-commerce events backed by an Apache Kafka topic. Then you'll write a streaming query to transform them into a table in ksqlDB. Finally, you'll query your summary table to fetch the contents of the shopping cart back to your application.

  1. On Confluent Cloud, open your shopping_cart_database ksqlDB application. Since you started your ksqlDB instance in Thinking in Events, the status field should now say "up" and not "provisioning". (If you haven't started an instance yet, see the discount code and instructions in Module 1.)
  2. Open the query editor and set two additional parameters under Add query properties:

    • auto.offset.reset = Earliest. This ensures that you always return to the start of a Kafka topic when you run a new query.
    • = 3000. This overwrites the default of thirty seconds, making your queries more responsive.
  3. In the editor, create a stream to hold your shopping cart events with the following ksqlDB command:

    CREATE STREAM shopping_cart_events (customer VARCHAR, item VARCHAR, qty INT)
    WITH (kafka_topic='shopping_cart_events', value_format='json', partitions=1);
  4. Select Run query and look for the output below the editor confirming that the event stream was created.

      "@type": "currentStatus",
      "commandId": "stream/`SHOPPING_CART_EVENTS`/create",
      "commandStatus": {
        "status": "SUCCESS",
        "message": "Stream created",
        "queryId": null
      "commandSequenceNumber": 2,
      "warnings": []
  5. Create four INSERT statements to push events into the shopping_cart_events topic:

    --add two pairs of pants
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'pants', 2);
    --add a t-shirt
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 't-shirts', 1);
    --remove one pair of pants
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'pants', -1);
    --add a hat
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'hats', 1);	

    Select Run query.

  6. Use a SELECT statement to return your four events:

    SELECT * FROM shopping_cart_events EMIT CHANGES;

    Click Run query to see the results and then click Stop to terminate the push query.

  7. Create a ksqlDB query to transform your events into a summary table by using a SUM on the qty field, with an associated GROUP BY that aggregates the events by item:

    CREATE TABLE current_shopping_cart WITH (KEY_FORMAT='JSON') AS
      SELECT customer, item, SUM(qty) as total_qty 
      FROM   shopping_cart_events 
      GROUP BY customer, item 

    Click Run Query.

    The result is a summary table that reflects how many items are in the shopping cart right now. The removals should cancel the items that were added.

  8. Now run a query against the table just like you would with a regular database table:

    SELECT * FROM current_shopping_cart EMIT CHANGES;

    Click Run Query.

    You should see three records, not four, i.e. one record per item. Specifically, you should see only one pair of pants, as the two events for pants have been summarized into one row.

    Click Stop to terminate the push query.

In this exercise, you used event sourcing to recreate the current state of a shopping cart from an unbounded event stream containing all user cart interactions.

Use the promo code EVENTS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.