When producing an event stream, it’s important to consider how your consumers will resolve relationships represented in the events. In an event stream with a normalized model, you may find that the data your service needs is spread across several different events in different event streams. You would then need to resolve the relationships by performing a series of stream-based joins or queries to external data stores to obtain the data you need for your system.
In contrast, denormalized event streams tend to be easier to use for consumers—they do not have to query other streams or state stores to get the data they need. Denormalized streams typically come at the expense of additional preprocessing by either the producer of the event, or by creating a purpose-built process to join the data together after the fact.
In either case, ksqlDB provides the capabilities for you to resolve any stream joins you need to make for your applications. In this hands-on exercise, you will use ksqlDB to denormalize several event streams that mirror the upstream relational model of an event source by resolving the foreign-key joins.
Create and Use a ksqlDB Table of Item Facts
Let’s start by creating the
item_dim2 tables, and populate them with events. Note the
dim2 qualifier to keep our data separate from the Dimension 1.
Open URL https://confluent.cloud and log in to the Confluent Cloud console.
Navigate to the default environment, the event-streams cluster, and the Editor for the event-streams-ksqDB cluster.
CREATE TABLE brands ( id BIGINT PRIMARY KEY, name STRING ) WITH ( KAFKA_TOPIC = 'brands', VALUE_FORMAT = 'AVRO', PARTITIONS = 6 );
CREATE TABLE tax_status ( id BIGINT PRIMARY KEY, state_tax DECIMAL(3, 2), country_tax DECIMAL(3, 2) ) WITH ( KAFKA_TOPIC = 'tax_status', VALUE_FORMAT = 'AVRO', PARTITIONS = 6 );
CREATE TABLE items_dim2 ( id BIGINT PRIMARY KEY, price DECIMAL(10, 2), name STRING, description STRING, brand_id BIGINT, tax_status_id BIGINT ) WITH ( KAFKA_TOPIC = 'items_dim2', VALUE_FORMAT = 'AVRO', PARTITIONS = 6 );
Next, you will join the items_dim2, brands, and tax_status tables together to create a new enriched_items table. You will need to do this using separate joins since ksqlDB requires this when doing foreign-key joins.
First, let’s join items_dim2 with brands creating items_brands.
items_and_brandstable by joining the
items_dim2table with the
CREATE TABLE items_and_brands AS SELECT items_dim2.id AS id, items_dim2.price, items_dim2.name AS name, items_dim2.description, brands.name AS brand_name, items_dim2.tax_status_id FROM items_dim2 JOIN brands ON brand_id = brands.id EMIT CHANGES;
Now let’s join the new items_and_brands table with tax_status.
Create a new
enriched_items table by joining the
items_and_brands table with the
CREATE TABLE enriched_items AS SELECT items_and_brands.id AS id, items_and_brands.price, items_and_brands.name, items_and_brands.description, items_and_brands.brand_name, tax_status.state_tax, tax_status.country_tax FROM items_and_brands JOIN tax_status ON items_and_brands.tax_status_id = tax_status.id EMIT CHANGES;
Insert sample data in the
INSERT INTO brands (id, name) VALUES (400, 'ACME'); INSERT INTO tax_status (id, state_tax, country_tax) VALUES (777, 0.05, 0.10);
Check the contents of the
enriched_items table (make sure you start from EARLIEST):
select * from ENRICHED_ITEMS EMIT CHANGES;
You’ll find that it is still empty. You need to create a single item that matches both the brand and the tax status to have an enriched event come through.
Insert a matching item into the
INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (321, 29.99, 'Anvil', 'Sturdy Iron Anvil, 400 lbs', 400, 777);
You will now see an enriched join result in the
SELECT * FROM enriched_items EMIT CHANGES;
Let’s create three more items of the same brand with the same tax status.
INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (9000, 19.99, 'Ball', 'Rubber Ball', 400, 777); INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (9001, 39.99, 'Baseball Glove', 'Leather Baseball Glove', 400, 777); INSERT INTO items_dim2 (id, price, name, description, brand_id, tax_status_id) VALUES (9002, 2.99, 'Tennis Ball', 'Green Tennis Ball', 400, 777);
Now let’s see what happens when you update items of the same brand at the same time. Let’s say that ACME has decided to rebrand to a new name. We’re going to upsert to overwrite the old brand name.
Change the brand from
INSERT INTO brands (id, name) VALUES (400, 'ACME Sports');
Verify that all items were updated with the new brand:
SELECT * FROM enriched_items EMIT CHANGES;
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.