When producing an event stream, it’s important to consider how your consumers will resolve relationships represented in the events. In an event stream with a normalized model, you may find that the data your service needs is spread across several different events in different event streams. You would then need to resolve the relationships by performing a series of stream-based joins or queries to external data stores to obtain the data you need for your system.
In contrast, denormalized event streams tend to be easier to use for consumers—they do not have to query other streams or state stores to get the data they need. Denormalized streams typically come at the expense of additional preprocessing by either the producer of the event, or by creating a purpose-built process to join the data together after the fact.
In either case, ksqlDB provides the capabilities for you to resolve any stream joins you need to make for your applications. In this hands-on exercise, you will use ksqlDB to denormalize several event streams that mirror the upstream relational model of an event source by resolving the foreign-key joins.
Create and Use a ksqlDB Table of Item Facts
Let’s start by creating the brands, tax_status, and item_dim2 tables, and populate them with events. Note the dim2 qualifier to keep our data separate from the Dimension 1.
Open URL https://confluent.cloud and log in to the Confluent Cloud console.
Navigate to the default environment, the event-streams cluster, and the Editor for the event-streams-ksqDB cluster.
Create the brands table:
CREATE TABLE brands ( id BIGINT PRIMARY KEY, name STRING ) WITH ( KAFKA_TOPIC = 'brands', VALUE_FORMAT = 'AVRO', PARTITIONS = 6 );
Create the tax_status table:
CREATE TABLE tax_status ( id BIGINT PRIMARY KEY, state_tax DECIMAL(3, 2), country_tax DECIMAL(3, 2) ) WITH ( KAFKA_TOPIC = 'tax_status', VALUE_FORMAT = 'AVRO', PARTITIONS = 6 );
Create an items_dim2 table:
CREATE TABLE items_dim2 ( id BIGINT PRIMARY KEY, price DECIMAL(10, 2), name STRING, description STRING, brand_id BIGINT, tax_status_id BIGINT ) WITH ( KAFKA_TOPIC = 'items_dim2', VALUE_FORMAT = 'AVRO', PARTITIONS = 6 );
Next, you will join the items_dim2, brands, and tax_status tables together to create a new enriched_items table. You will need to do this using separate joins since ksqlDB requires this when doing foreign-key joins.
First, let’s join items_dim2 with brands creating items_brands.
CREATE TABLE items_and_brands AS SELECT items_dim2.id AS id, items_dim2.price, items_dim2.name AS name, items_dim2.description, brands.name AS brand_name, items_dim2.tax_status_id FROM items_dim2 JOIN brands ON brand_id = brands.id EMIT CHANGES;
Now let’s join the new items_and_brands table with tax_status.
Create a new enriched_items table by joining the items_and_brands table with the tax_status table.
CREATE TABLE enriched_items AS SELECT items_and_brands.id AS id, items_and_brands.price, items_and_brands.name, items_and_brands.description, items_and_brands.brand_name, tax_status.state_tax, tax_status.country_tax FROM items_and_brands JOIN tax_status ON items_and_brands.tax_status_id = tax_status.id EMIT CHANGES;
Insert sample data in the brands and tax_status tables:
INSERT INTO brands (id, name) VALUES (400, 'ACME'); INSERT INTO tax_status (id, state_tax, country_tax) VALUES (777, 0.05, 0.10);
Check the contents of the enriched_items table (make sure you start from EARLIEST):
select * from ENRICHED_ITEMS EMIT CHANGES;
You’ll find that it is still empty. You need to create a single item that matches both the brand and the tax status to have an enriched event come through.
Insert a matching item into the items_dim2 table:
INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (321, 29.99, 'Anvil', 'Sturdy Iron Anvil, 400 lbs', 400, 777);
You will now see an enriched join result in the enriched_items table:
SELECT * FROM enriched_items EMIT CHANGES;
Let’s create three more items of the same brand with the same tax status.
INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (9000, 19.99, 'Ball', 'Rubber Ball', 400, 777); INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (9001, 39.99, 'Baseball Glove', 'Leather Baseball Glove', 400, 777); INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (9002, 2.99, 'Tennis Ball', 'Green Tennis Ball', 400, 777);
Now let’s see what happens when you update items of the same brand at the same time. Let’s say that ACME has decided to rebrand to a new name. We’re going to upsert to overwrite the old brand name.
Change the brand from ACME to ACME Sports:
INSERT INTO brands (id, name) VALUES (400, 'ACME Sports');
Verify that all items were updated with the new brand:
SELECT * FROM enriched_items EMIT CHANGES;
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.