Get Started Free
course: ksqlDB 101

Hands On: Flatten Nested Records with ksqlDB

2 min
Allison

Allison Walther

Integration Architect (Presenter)

Hands On: Flatten Nested Records with ksqlDB

This exercise demonstrates how to flatten nested records with ksqlDB.

Create a Nested Stream

  1. Begin by creating a nested stream in the Confluent Cloud editor. If you’ve been working through earlier lessons, you may have to remove a previous orders stream, like so:

    DROP STREAM orders_enriched DELETE TOPIC;
    DROP STREAM orders_no_address DELETE TOPIC;
    DROP STREAM orders DELETE TOPIC;

    Now create the new orders stream:

    CREATE STREAM orders (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, 
    orderunits INTEGER,  address STRUCT< street  VARCHAR, city VARCHAR, state VARCHAR>)
    WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', PARTITIONS=1);

    Click Run query.

Insert and Inspect Data

  1. Insert some data into your orders stream:

    INSERT INTO orders VALUES (1620504934723, 70, 'item_4', 1,
    	STRUCT(street:='210 West Veterans Drive', city:='Sacramento', state:='California'));
    INSERT INTO orders VALUES (16205059321941, 72, 'item_7', 9,
    	STRUCT(street:='10043 Bella Vista Blvd', city:='Oakland', state:='California'));
    INSERT INTO orders VALUES (16205069437125, 73, 'item_3', 4,
    	STRUCT(street:='4921 Parker Place', city:='Pasadena', state:='California'));
    INSERT INTO orders VALUES (1620508354284, 74, 'item_7', 3,
    	STRUCT(street:='1009 First Street', city:='Fresno', state:='California'));

    Click Run query.

  2. Change auto.offset.reset to earliest so that when you query the stream, you get the earliest message. Use a SELECT statement to validate that the data has been entered correctly:

    SELECT * FROM orders EMIT CHANGES; 

    Click Run query.

  3. Scroll down and view the current structure of a record by clicking the caret in its upper left-hand corner. You see the nesting.

Flatten the Records

  1. Now create a new query to flatten the records:

    SELECT ordertime, orderid, itemid, orderunits, address->street AS street, address->city AS city, address->state AS state
    FROM orders EMIT CHANGES; 

    Click Run query.

  2. Expand another record, and you will see that your data structure has been flattened.

Make Your Query Persistent and Inspect the Data

  1. Make your query persistent by appending a CREATE STREAM AS statement to your code from Step 5:

    CREATE STREAM orders_flat WITH (KAFKA_TOPIC='orders_flat') AS
    SELECT ordertime, orderid, itemid, orderunits, address->street AS street, address->city AS city, address->state AS state
    FROM orders EMIT CHANGES; 

    Click Run query to generate the stream.

  2. Next, write a SELECT statement against the new orders_flat stream:

    SELECT * FROM orders_flat EMIT CHANGES;

    Click Run query. Expand the structures of the records in the new stream to see that they are flattened.

Use the promo code KSQLDB101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.