Integration Architect (Presenter)
This exercise demonstrates how to flatten nested records with ksqlDB.
Begin by creating a nested stream in the Confluent Cloud editor. If you’ve been working through earlier lessons, you may have to remove a previous orders stream, like so:
DROP STREAM orders_enriched DELETE TOPIC;
DROP STREAM orders_no_address DELETE TOPIC;
DROP STREAM orders DELETE TOPIC;
Now create the new orders stream:
CREATE STREAM orders (ordertime BIGINT, orderid INTEGER, itemid VARCHAR,
orderunits INTEGER, address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', PARTITIONS=1);
Click Run query.
Insert some data into your orders stream:
INSERT INTO orders VALUES (1620504934723, 70, 'item_4', 1,
STRUCT(street:='210 West Veterans Drive', city:='Sacramento', state:='California'));
INSERT INTO orders VALUES (16205059321941, 72, 'item_7', 9,
STRUCT(street:='10043 Bella Vista Blvd', city:='Oakland', state:='California'));
INSERT INTO orders VALUES (16205069437125, 73, 'item_3', 4,
STRUCT(street:='4921 Parker Place', city:='Pasadena', state:='California'));
INSERT INTO orders VALUES (1620508354284, 74, 'item_7', 3,
STRUCT(street:='1009 First Street', city:='Fresno', state:='California'));
Click Run query.
Change auto.offset.reset to earliest so that when you query the stream, you get the earliest message. Use a SELECT statement to validate that the data has been entered correctly:
SELECT * FROM orders EMIT CHANGES;
Click Run query.
Scroll down and view the current structure of a record by clicking the caret in its upper left-hand corner. You see the nesting.
Now create a new query to flatten the records:
SELECT ordertime, orderid, itemid, orderunits, address->street AS street, address->city AS city, address->state AS state
FROM orders EMIT CHANGES;
Click Run query.
Expand another record, and you will see that your data structure has been flattened.
Make your query persistent by appending a CREATE STREAM AS statement to your code from Step 5:
CREATE STREAM orders_flat WITH (KAFKA_TOPIC='orders_flat') AS
SELECT ordertime, orderid, itemid, orderunits, address->street AS street, address->city AS city, address->state AS state
FROM orders EMIT CHANGES;
Click Run query to generate the stream.
Next, write a SELECT statement against the new orders_flat stream:
SELECT * FROM orders_flat EMIT CHANGES;
Click Run query. Expand the structures of the records in the new stream to see that they are flattened.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
In this exercise, we're going to flatten some nested records. We'll start off with a create stream orders query. Let's insert some data into our order stream. Next let's validate that the data has been entered correctly with a select statement. But before doing that, we need to make sure that we set the auto offset reset policy to earliest. That way when we query the stream, we'll get the earliest message. Run the query. Scroll down and expand a record to see the current structure. It's pretty nasty, huh? Let's go ahead and create a new query that'll flatten those records. Run that query. Now if you scroll back down again, you should be able to expand another record to see the new structure, and goodness, it looks a lot better. Let's make this a persistent query. We can do that by pre-appending create stream as to the statement that we have in the editor. Run the query and we'll have a new stream created. Next, let's enter and run a push query against our new stream. Again, scroll down and check out the records. You'll see that the structure of the records in the new stream is exactly what we were looking for. And with that, you've flattened some nested records.