Integration Architect (Presenter)
Imagine that you have an Apache Kafka topic with messages about product orders. One of the attributes of the order event is the address, which includes the state where the customer lives.
But you want to create a new stream of data, continually populated from the first stream, just for orders from New York. This could be to support additional state-specific processing that you need to apply to the orders.
This way, instead of consuming the full order stream and discarding messages you don’t need, your application can subscribe to a new stream of pre-filtered messages. The ksqlDB statement looks like this:
CREATE STREAM ORDERS_NY AS
SELECT *
FROM ORDERS
WHERE ADDRESS->STATE='New York';
The CREATE STREAM … AS SELECT construct is analogous to CREATE TABLE AS SELECT in the RDBMS world, except that instead of dealing in static sets of data, you are now transforming one unbounded stream into another unbounded stream. You can filter on nested and unnested fields.
Since ksqlDB streams are backed by Kafka topics, this means that you are writing this data directly to a Kafka topic:
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------
ORDERS_NY | 6 | 1
orders | 6 | 1
---------------------------------------------------
The data present on the new Kafka topic is identical to the source orders topic, but only for orders from New York:
ksql> PRINT ORDERS_NY LIMIT 2;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2021/02/22 11:16:10.881 Z, key: <null>, value: {"ORDERTIME": 1560057709254, "ORDERID": 4459, "ITEMID": "Item_7", "ORDERUNITS": 18, "ADDRESS": {"STREET": "7 Fordem Plaza", "CITY": "Rochester", "STATE": "New York"}}, partition: 0
rowtime: 2021/02/22 10:57:36.879 Z, key: <null>, value: {"ORDERTIME": 1560045400186, "ORDERID": 35, "ITEMID": "Item_7", "ORDERUNITS": 5, "ADDRESS": {"STREET": "80 Dawn Parkway", "CITY": "New York City", "STATE": "New York"}}, partition: 0
Topic printing ceased
ksql>
Hi, I'm Allison Walther with Confluent. Let's talk about filtering with ksqlDB. Sometimes we want to filter a stream of events to only show us the ones that match a certain criteria. For example, the team in a regional sales office might want to see all of the orders from customers in New York. We can create a new stream in ksqlDB that uses a where clause on a select. This stream will now contain all of the order events from customers in New York. Notice that we can even filter on data in nested fields. There's not much to filtering, and that's it for this lesson. Let's jump into an exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.