Integration Architect (Presenter)
This exercise will teach you how to filter streams in ksqlDB for Confluent Cloud.
Begin by creating a stream for orders
in the editor (make sure to set auto.offset.reset = earliest
):
CREATE stream orders (id INTEGER KEY, item VARCHAR, address STRUCT <
city VARCHAR, state VARCHAR >)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', partitions=1);
Click Run query.
Next, insert orders into the new orders
stream:
INSERT INTO orders(id, item, address)
VALUES(140, 'Mauve Widget', STRUCT(city:='Ithaca', state:='NY'));
INSERT INTO orders(id, item, address)
VALUES(141, 'Teal Widget', STRUCT(city:='Dallas', state:='TX'));
INSERT INTO orders(id, item, address)
VALUES(142, 'Violet Widget', STRUCT(city:='Pasadena', state:='CA'));
INSERT INTO orders(id, item, address)
VALUES(143, 'Purple Widget', STRUCT(city:='Yonkers', state:='NY'));
INSERT INTO orders(id, item, address)
VALUES(144, 'Tan Widget', STRUCT(city:='Amarillo', state:='TX'));
Click Run query.
Set the auto.offset.reset
property to earliest
so that the new filtered stream we are about to create will include the data we inserted in the previous step.
Next, create a stream for only New York orders:
CREATE STREAM ny_orders AS SELECT * FROM ORDERS WHERE
ADDRESS->STATE='NY' EMIT CHANGES;
Click Run query.
To validate the filtered stream, run a query against your ny_orders
topic:
SELECT * FROM ny_orders EMIT CHANGES;
Click Run query.
Scroll through the results and see that all of the records are from New York.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.