Get Started Free
course: ksqlDB 101

Filtering with ksqlDB

1 min
Allison

Allison Walther

Integration Architect (Presenter)

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Filtering with ksqlDB

Imagine that you have an Apache Kafka topic with messages about product orders. One of the attributes of the order event is the address, which includes the state where the customer lives.

orders

But you want to create a new stream of data, continually populated from the first stream, just for orders from New York. This could be to support additional state-specific processing that you need to apply to the orders.

oders-ny

This way, instead of consuming the full order stream and discarding messages you don’t need, your application can subscribe to a new stream of pre-filtered messages. The ksqlDB statement looks like this:

CREATE STREAM ORDERS_NY AS
  SELECT *
    FROM ORDERS
   WHERE ADDRESS->STATE='New York';

The CREATE STREAM … AS SELECT construct is analogous to CREATE TABLE AS SELECT in the RDBMS world, except that instead of dealing in static sets of data, you are now transforming one unbounded stream into another unbounded stream. You can filter on nested and unnested fields.

Since ksqlDB streams are backed by Kafka topics, this means that you are writing this data directly to a Kafka topic:

ksql> SHOW TOPICS;

 Kafka Topic     | Partitions | Partition Replicas
---------------------------------------------------
 ORDERS_NY       | 6          | 1
 orders          | 6          | 1
---------------------------------------------------

The data present on the new Kafka topic is identical to the source orders topic, but only for orders from New York:

ksql> PRINT ORDERS_NY LIMIT 2;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO
rowtime: 2021/02/22 11:16:10.881 Z, key: <null>, value: {"ORDERTIME": 1560057709254, "ORDERID": 4459, "ITEMID": "Item_7", "ORDERUNITS": 18, "ADDRESS": {"STREET": "7 Fordem Plaza", "CITY": "Rochester", "STATE": "New York"}}, partition: 0
rowtime: 2021/02/22 10:57:36.879 Z, key: <null>, value: {"ORDERTIME": 1560045400186, "ORDERID": 35, "ITEMID": "Item_7", "ORDERUNITS": 5, "ADDRESS": {"STREET": "80 Dawn Parkway", "CITY": "New York City", "STATE": "New York"}}, partition: 0
Topic printing ceased
ksql>

Use the promo code KSQLDB101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.