Integration Architect (Presenter)
Continuing with the orders
stream from previous exercises, this exercise teaches you how to transform data in ksqlDB.
Begin by deleting the streams created previously:
DROP STREAM orders_enriched DELETE TOPIC;
DROP STREAM orders DELETE TOPIC;
then, create another orders
stream with:
CREATE STREAM orders (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER, address STRUCT < street VARCHAR, city VARCHAR, state VARCHAR>)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro', PARTITIONS=1);
finally, insert some data into it:
INSERT INTO orders VALUES (1620504934723, 70, 'item_5', 1,
STRUCT(street:='210 West Veterans Drive', city:='Sacramento', state:='California Foo2'));
INSERT INTO orders VALUES (16205059321941, 72, 'item_6', 9,
STRUCT(street:='10043 Bella Vista Blvd', city:='Oakland', state:='California'));
INSERT INTO orders VALUES (1620503083019, 77, 'item_7', 12,
STRUCT(street:='10083 Garvey Ave', city:='Rosemead', state:='California'));
Next, select everything from the orders
stream:
SELECT * FROM orders EMIT CHANGES;
Click Run query.
Scroll down to see the returned records. View the structure of one of the records by clicking on the caret in its upper left-hand corner.
Now Stop your orders query.
Create a persistent transformation with no address data:
CREATE STREAM orders_no_address AS
SELECT TIMESTAMPTOSTRING(ordertime, 'yyyy-MM-dd HH:mm:ss') AS order_timestamp, orderid, itemid, orderunits
FROM orders EMIT CHANGES;
Now select from the transformed stream:
SELECT * FROM orders_no_address EMIT CHANGES;
Click Run query.
Expand the record to see the transformed data.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.