Get Started Free
Untitled design (21)

Tim Berglund

VP Developer Relations

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Filtering Streams of Data with ksqlDB

So far we've covered the ingest and egress components of streaming pipelines. Let's now look at how we can also process that data as part of the pipeline.

Apache Kafka ships with stream processing capabilities in the form of the Kafka Streams library for Java and Scala, which you can use to build elastic applications that read, write, and process event streams in Kafka. This is great if you're in the Java ecosystem, but plenty of people aren't. The streaming database ksqlDB abstracts away the need to write Java code and instead gives you a lightweight SQL language in which to express the stream processing that you'd like to do (plus many more features, of which we cover only a few in this course).

filtering-events-with-ksqldb

In our stream of ratings data, there are events generated by a test system that have crept in, which we want to filter out. We can identify these by looking at the channel field. When it comes to processing the events, we only want to include those which are from production sources.

CREATE STREAM RATINGS_LIVE AS
  SELECT *
  FROM  RATINGS
  WHERE LCASE(CHANNEL) NOT LIKE '%test%'
  EMIT CHANGES;

As each event is read from the source stream, ksqlDB applies the predicate against the channel field, and only if it does not include test within it does the event get written to the target stream. We can now run any subsequent ksqlDB queries against the ratings_live stream.

Use the promo code PIPELINES101 to receive $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.