Course: Inside ksqlDB

How Stateless Operations Work

6 min
Michael DrogalisPrincipal Product Manager

How Stateless Operations Work

A stream in ksqlDB is a supercharged Apache Kafka topic. Consider the following ksqlDB SQL:

CREATE STREAM readings (
    sensor VARCHAR KEY,
    reading DOUBLE,
    location VARCHAR
) WITH (
    kafka_topic='readings',
    value_format='json',
    partitions=3
);

This example includes a schema with three columns, sensor, reading, and location. The types are explicitly stated: strings (varchar) and double. Notice that sensor is marked with the KEY keyword.

There’s also additional metadata: The command specifies the underlying Kafka topic where the data will be stored (readings), along with the serialization format (JSON) and the number of partitions. But what happens internally when you issue this SQL at the ksqlDB command line?

The ksqlDB servers communicate directly with your Kafka brokers. If the underlying topic doesn’t exist, it gets created by Kafka. The additional metadata, like the types, gets stored in ksqlDB’s object catalog. So after executing the command, you now have a Kafka topic (readings) with three empty partitions.

Next, put some data into your empty stream. You accomplish this using rows, the equivalent of messages in Kafka. You put rows into a stream using simple SQL statements, just like with a relational database. If you’ve worked with relational databases such as PostgreSQL or MySQL, this will look familiar to you:

INSERT INTO readings (sensor, reading, location)
VALUES ('sensor-1', 45, 'wheel');  

INSERT INTO readings (sensor, reading, location)
VALUES ('sensor-2', 41, 'motor');  

So you insert into the stream readings, naming the specific columns for which you would like to supply data. Each INSERT statement corresponds to the invocation of a Kafka client producer call, and the data is stored in the stream’s underlying topic.

Now consider the representation of a ksqlDB row inside of a Kafka topic. Specifically look at the first INSERT SQL statement above:

readings

This becomes the first row in the first partition of the readings topic, with an offset of 0. It has a key and a value, as every Kafka record does. The key is sensor-1 (as denoted by the KEY parameter for the sensor field when the stream was created), and the value is a map with a reading of 45 and a location of wheel. Subsequent rows are written to the topic’s partition based on a hashed value of the key: Rows with the same key will be written to the same partition.

A Simple Filter

Now that you have data in your stream, you can perform real-time processing on it. The simplest action is a transformation, which is done here with a query to convert all values in the location field to uppercase:

CREATE STREAM clean AS
    SELECT sensor,
        reading,
        UCASE(location) AS location
    FROM readings
    EMIT changes;

This query consumes from your readings stream and processes the data, leaving sensor and reading unchanged and converting location to uppercase. The data is written into a new stream named clean.

When you run the above statement from your client (whether it is the CLI, the Confluent web interface, or the REST API), it is sent to the ksqlDB server. From there it is compiled by ksqlDB into a Kafka Streams topology to execute the query. The query runs even after the client disconnects and will resume if the ksqlDB server is restarted. We call this a persistent query.

As the rows are processed, the offset of the ksqlDB consumer advances through each partition:

These offsets are tracked automatically for you by the Kafka consumer protocol.

Combining Filters

ksqlDB can be used to apply filters to a stream of data. Here we take the output of the previous query and create a new version that will selectively filter rows whose reading values are greater than 41:

CREATE STREAM high_readings AS
    SELECT sensor, reading, location
    FROM clean
    WHERE reading > 41
    EMIT CHANGES;

You can combine statements to perform multiple actions at once. For example, you can combine the two persistent queries shown previously into one:

CREATE STREAM high_pri AS
    SELECT sensor,
        reading, 
        UCASE(location) AS location
    FROM readings
    WHERE reading > 41
    EMIT CHANGES;

As before, you don’t have to worry about how these queries actually operate over the data in Kafka, which is one of the great advantages of using ksqlDB.

Multiple Queries Reading the Same Data

Because ksqlDB statements are executed as Kafka Streams topologies, you get all of the usual Kafka consumer guarantees. Below, multiple persistent queries are reading from the same stream:

Because this is effectively two Kafka consumers, you can rely on them being able to process the same data independently. The data that they process exists once on the source Kafka topic, and Kafka tracks the offset that each query has independently consumed. This way, you can scale the consumption of data across many applications, without introducing dependencies between them.

Use the promo code KSQLDB101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.