Get Started Free
‹ Back to courses
course: ksqlDB 101

Filtering with ksqlDB

1 min

Allison Walther

Integration Architect (Presenter)

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Filtering with ksqlDB

Imagine that you have an Apache Kafka topic with messages about product orders. One of the attributes of the order event is the address, which includes the state where the customer lives.


But you want to create a new stream of data, continually populated from the first stream, just for orders from New York. This could be to support additional state-specific processing that you need to apply to the orders.


This way, instead of consuming the full order stream and discarding messages you don’t need, your application can subscribe to a new stream of pre-filtered messages. The ksqlDB statement looks like this:


The CREATE STREAM … AS SELECT construct is analogous to CREATE TABLE AS SELECT in the RDBMS world, except that instead of dealing in static sets of data, you are now transforming one unbounded stream into another unbounded stream. You can filter on nested and unnested fields.

Since ksqlDB streams are backed by Kafka topics, this means that you are writing this data directly to a Kafka topic:


 Kafka Topic     | Partitions | Partition Replicas
 ORDERS_NY       | 6          | 1
 orders          | 6          | 1

The data present on the new Kafka topic is identical to the source orders topic, but only for orders from New York:

Key format: ¯\_()_/¯ - no data processed
Value format: AVRO
rowtime: 2021/02/22 11:16:10.881 Z, key: <null>, value: {"ORDERTIME": 1560057709254, "ORDERID": 4459, "ITEMID": "Item_7", "ORDERUNITS": 18, "ADDRESS": {"STREET": "7 Fordem Plaza", "CITY": "Rochester", "STATE": "New York"}}, partition: 0
rowtime: 2021/02/22 10:57:36.879 Z, key: <null>, value: {"ORDERTIME": 1560045400186, "ORDERID": 35, "ITEMID": "Item_7", "ORDERUNITS": 5, "ADDRESS": {"STREET": "80 Dawn Parkway", "CITY": "New York City", "STATE": "New York"}}, partition: 0
Topic printing ceased

Use the promo code KSQLDB101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Filtering with ksqlDB

Hi, I'm Allison Walther with Confluent. Let's talk about filtering with ksqlDB. Sometimes we want to filter a stream of events to only show us the ones that match a certain criteria. For example, the team in a regional sales office might want to see all of the orders from customers in New York. We can create a new stream in ksqlDB that uses a where clause on a select. This stream will now contain all of the order events from customers in New York. Notice that we can even filter on data in nested fields. There's not much to filtering, and that's it for this lesson. Let's jump into an exercise.