Integration Architect (Presenter)
This exercise demonstrates how to flatten nested records with ksqlDB.
Begin by creating a nested stream in the Confluent Cloud editor. If you’ve been working through earlier lessons, you may have to remove a previous orders
stream, like so:
DROP STREAM orders_enriched DELETE TOPIC;
DROP STREAM orders_no_address DELETE TOPIC;
DROP STREAM orders DELETE TOPIC;
Now create the new orders
stream:
CREATE STREAM orders (ordertime BIGINT, orderid INTEGER, itemid VARCHAR,
orderunits INTEGER, address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', PARTITIONS=1);
Click Run query.
Insert some data into your orders
stream:
INSERT INTO orders VALUES (1620504934723, 70, 'item_4', 1,
STRUCT(street:='210 West Veterans Drive', city:='Sacramento', state:='California'));
INSERT INTO orders VALUES (16205059321941, 72, 'item_7', 9,
STRUCT(street:='10043 Bella Vista Blvd', city:='Oakland', state:='California'));
INSERT INTO orders VALUES (16205069437125, 73, 'item_3', 4,
STRUCT(street:='4921 Parker Place', city:='Pasadena', state:='California'));
INSERT INTO orders VALUES (1620508354284, 74, 'item_7', 3,
STRUCT(street:='1009 First Street', city:='Fresno', state:='California'));
Click Run query.
Change auto.offset.reset
to earliest
so that when you query the stream, you get the earliest message. Use a SELECT
statement to validate that the data has been entered correctly:
SELECT * FROM orders EMIT CHANGES;
Click Run query.
Scroll down and view the current structure of a record by clicking the caret in its upper left-hand corner. You see the nesting.
Now create a new query to flatten the records:
SELECT ordertime, orderid, itemid, orderunits, address->street AS street, address->city AS city, address->state AS state
FROM orders EMIT CHANGES;
Click Run query.
Expand another record, and you will see that your data structure has been flattened.
Make your query persistent by appending a CREATE STREAM AS
statement to your code from Step 5:
CREATE STREAM orders_flat WITH (KAFKA_TOPIC='orders_flat') AS
SELECT ordertime, orderid, itemid, orderunits, address->street AS street, address->city AS city, address->state AS state
FROM orders EMIT CHANGES;
Click Run query to generate the stream.
Next, write a SELECT
statement against the new orders_flat
stream:
SELECT * FROM orders_flat EMIT CHANGES;
Click Run query. Expand the structures of the records in the new stream to see that they are flattened.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.