Get Started Free
course: Event Sourcing and Event Storage with Apache Kafka®

Hands On: Trying Out Event Sourcing with Confluent Cloud

3 min
Anna Mcdonald

Anna McDonald

Principal Customer Success Technical Architect (Presenter)

Ben Stopford

Ben Stopford

Lead Technologist, Office of the CTO (Author)

Hands On: Trying Out Event Sourcing with Confluent Cloud

Simulate event sourcing by completing this exercise, which uses ksqlDB on Confluent Cloud. You'll begin by creating a stream of e-commerce events backed by an Apache Kafka topic. Then you'll write a streaming query to transform them into a table in ksqlDB. Finally, you'll query your summary table to fetch the contents of the shopping cart back to your application.

  1. Sign up for Confluent Cloud if you have not already done so, and respond to the verification email to confirm your account.

  2. On Confluent Cloud, open the shopping_cart_database that you set up in the module Thinking in Events (if you haven't set it up yet, please return there for instructions and a discount code). If you started your ksqlDB instance then, the status field should now say "up" and not "provisioning".

  3. Open the query editor and set two additional parameters under Add query properties:

    • auto.offset.reset = Earliest. This ensures that you always return to the start of a Kafka topic when you run a new query.
    • ksql.streams.commit.interval.ms = 3000. This overwrites the default of thirty seconds, making your queries more responsive.
  4. In the editor, create a stream to hold your shopping cart events with the following ksqlDB command:

    CREATE STREAM shopping_cart_events (customer VARCHAR, item VARCHAR, qty INT)
    WITH (kafka_topic='shopping_cart_events', value_format='json', partitions=1);
  5. Select Run query and look for the output below the editor confirming that the event stream was created.

    {
      "@type": "currentStatus",
      "statementText": "CREATE STREAM SHOPPING_CART_EVENTS (CUSTOMER STRING, ITEM STRING, QTY INTEGER) WITH (KAFKA_TOPIC='shopping_cart_events', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');",
      "commandId": "stream/`SHOPPING_CART_EVENTS`/create",
      "commandStatus": {
        "status": "SUCCESS",
        "message": "Stream created",
        "queryId": null
      },
      "commandSequenceNumber": 2,
      "warnings": []
    }
  6. Create four INSERT statements to push events into the shopping_cart_events topic:

    --add two pairs of pants
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'pants', 2);
    --add a t-shirt
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 't-shirts', 1);
    --remove one pair of pants
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'pants', -1);
    --add a hat
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'hats', 1);	

    Select Run query.

  7. Use a SELECT statement to return your four events:

    SELECT * FROM shopping_cart_events EMIT CHANGES;

    Click Run query to see the results and then click Stop to terminate the push query.

  8. Create a ksqlDB query to transform your events into a summary table by using a SUM on the qty field, with an associated GROUP BY that aggregates the events by item:

    CREATE TABLE current_shopping_cart WITH (KEY_FORMAT='JSON') AS
      SELECT customer, item, SUM(qty) as total_qty 
      FROM   shopping_cart_events 
      GROUP BY customer, item 
      EMIT CHANGES;

    Click Run Query.

    The result is a summary table that reflects how many items are in the shopping cart right now. The removals should cancel the items that were added.

  9. Now run a query against the table just like you would with a regular database table:

    SELECT * FROM current_shopping_cart EMIT CHANGES;

    Click Run Query.

    You should see three records, not four, i.e. one record per item. Specifically, you should see only one pair of pants, as the two events for pants have been summarized into one row.

    Click Stop to terminate the push query.

  10. Go to Cluster settings on the left-hand side menu, then click Delete cluster. Enter your cluster's name, then select Continue.

  11. Delete your ksqlDB application by clicking ksqlDB on the left-hand side menu, then selecting Delete under Actions. Enter your application's name, then click Continue.

In this exercise, you used event sourcing to recreate the current state of a shopping cart from an unbounded event stream containing all user cart interactions.

Use the promo code EVENTS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.