Get Started Free
Untitled design (21)

Tim Berglund

VP Developer Relations

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Hands On: Filtering Streams of Data with ksqlDB

The ratings messages that we receive include a field that indicates the device from which they were left. The field is called channel and includes some values indicating that they’re from test devices.

We’d like to create a new stream that includes only data from live devices. For this we can use ksqlDB.

  1. Before continuing, make sure that you have created a ksqlDB application on Confluent Cloud as described in the first exercise. From the "ksqlDB" page, you should see the application listed and in "Status" Up.

    ksqlDB application in the list

  2. Click on the ksqlDB application to open the editor. The first thing that you need to do is to declare a ksqlDB stream on the topic that holds the ratings events. This gives ksqlDB the information that it needs about the schema of the data.

    Paste the following statement into the "Editor" and click Run query.

    CREATE STREAM RATINGS 
      WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');

    CREATE STREAM RATINGS

  3. You can view the messages flowing through the Kafka topic by running a SELECT against the stream:

    SELECT USER_ID, STARS, CHANNEL, MESSAGE 
      FROM RATINGS EMIT CHANGES;

    Use the table icon in to the top right of the messages to view them as columns

    SELECT … FROM RATINGS

  4. Note how in the data shown returned in the above query, there are values in the CHANNEL field that include -test. You can filter these out using a SQL predicate:

    SELECT USER_ID, STARS, CHANNEL, MESSAGE
      FROM RATINGS
      WHERE LCASE(CHANNEL) NOT LIKE '%test%'
      EMIT CHANGES;

    When you run this, you’ll notice that the results are returned to the screen.

  5. To tell ksqlDB to process all of the existing messages in the topic as well as all new ones that arrive, we set the auto.offset.reset parameter to earliest. To do this, change the dropdown from its default of Latest to Earliest.

    ksqlDB auto.offset.reset

  6. Using the above statement, we can get ksqlDB to write all messages matching this criterion into a new ksqlDB stream. A ksqlDB stream is always backed by a Kafka topic.

    CREATE STREAM RATINGS_LIVE AS
      SELECT * 
        FROM RATINGS
      WHERE LCASE(CHANNEL) NOT LIKE '%test%'
      EMIT CHANGES;

    CSAS

  7. Query the new stream and validate that there are no CHANNEL values with test in them:

    SELECT USER_ID, STARS, CHANNEL, MESSAGE
      FROM RATINGS_LIVE
      EMIT CHANGES;

    Results from ratings_live stream

  8. From your cluster’s "Topics" page, locate the new Kafka topic that’s been created. It will have a prefix in its name but end with RATINGS_LIVE.

    New ratings_live topic

    Click on the topic. If data lineage is enabled on your cluster, click on it to view the flow of data that you’ve created.

    Confluent Data Lineage

Use the promo code PIPELINES101 to receive $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.