Integration Architect (Presenter)
This exercise will teach you how to filter streams in ksqlDB for Confluent Cloud.
Begin by creating a stream for orders in the editor (make sure to set auto.offset.reset = earliest):
CREATE stream orders (id INTEGER KEY, item VARCHAR, address STRUCT <
city VARCHAR, state VARCHAR >)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', partitions=1);
Click Run query.
Next, insert orders into the new orders stream:
INSERT INTO orders(id, item, address)
VALUES(140, 'Mauve Widget', STRUCT(city:='Ithaca', state:='NY'));
INSERT INTO orders(id, item, address)
VALUES(141, 'Teal Widget', STRUCT(city:='Dallas', state:='TX'));
INSERT INTO orders(id, item, address)
VALUES(142, 'Violet Widget', STRUCT(city:='Pasadena', state:='CA'));
INSERT INTO orders(id, item, address)
VALUES(143, 'Purple Widget', STRUCT(city:='Yonkers', state:='NY'));
INSERT INTO orders(id, item, address)
VALUES(144, 'Tan Widget', STRUCT(city:='Amarillo', state:='TX'));
Click Run query.
Set the auto.offset.reset property to earliest so that the new filtered stream we are about to create will include the data we inserted in the previous step.
Next, create a stream for only New York orders:
CREATE STREAM ny_orders AS SELECT * FROM ORDERS WHERE
ADDRESS->STATE='NY' EMIT CHANGES;
Click Run query.
To validate the filtered stream, run a query against your ny_orders topic:
SELECT * FROM ny_orders EMIT CHANGES;
Click Run query.
Scroll through the results and see that all of the records are from New York.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Let's jump into an exercise about filtering streams. We're gonna start off by creating a stream for orders. This order stream will contain data from various different places. Let's go ahead and run this query. Next, let's insert some orders into our order stream. Run that insert query. Next, we're gonna create a stream for only New York orders. Go ahead and run the query. To validate, we'll run a push query against our New York order stream to validate that there's only orders from New York in our stream. Go ahead and run the push query. Below, you should be able to scroll through results and see that all of the records are from New York and New York only. Now you know how to filter streams.