Course: Inside ksqlDB

How Stateful Operations Work

6 min
Michael DrogalisPrincipal Product Manager

How Stateful Operations Work

So far, you have learned about processing streams of data in a stateless way, where ksqlDB does not retain calculations or other information as it processes each row. Now consider what happens within ksqlDB when state becomes part of the equation.

Materialized Views

In a traditional relational database, a materialized view speeds up queries by maintaining precalculated results of queries against an underlying table. These calculations could be aggregations such as SUM or COUNT, for example. ksqlDB has similar capabilities and can maintain calculations against rows arriving in a stream. These are stateful and are stored in a ksqlDB table.

Here is an example of building a materialized view in ksqlDB. It holds the average value of reading for each sensor:

CREATE TABLE avg_readings AS
    SELECT sensor, 
         AVG(reading) as avg 
    FROM readings
    GROUP BY sensor

As new reading rows arrive on the source stream readings, the avg_readings table is updated accordingly. The table can be queried to find the current average reading for a sensor (as you would query a relational database table). You can also subscribe to a stream of updates to the table using a push query, which is discussed below.

ksqlDB doesn’t recalculate a new average each time from scratch but rather factors into the incremental difference using the new data point. This is write-time amortization, and it’s why queries against ksqlDB’s materialized tables are so fast.

The materialization is stored locally on the ksqlDB server using RocksDB, an embedded key-value store that transparently runs in process and extends fast access to a copy of the data.

ksqlDB creates an audit log of all changes to the materialized view and stores this changelog as an Apache Kafka® topic, which means it's highly durable and can be replayed to restore the state if needed.

Push and Pull Queries

ksqlDB has two options for querying data: pull queries and push queries.

Pull queries, which are run against materialized views, are similar to traditional database queries in which you query your data and then get a response. They are a process that starts, performs an operation, and then terminates. When you run a pull query at a particular moment, you get the current state of the table (the materialized view) at that specific point in time.

Push queries, on the other hand, let you subscribe to a stream of updates, and they stay alive until you stop them. So if you run a push query against a table (a materialized view), you receive an update when the state changes. In the example above, this would be every time the average reading for a sensor changes.

Pull and push queries each have distinct uses, and you pick the one to use based on your application’s requirements.

Automatic Repartitioning

In the context of distributed data processing, data shuffling is necessary to collect all of your data into one place so that you can query it. ksqlDB does this automatically with a repartitioning topic, which is internal to ksqlDB.

Our original source stream, readings, is partitioned by sensor. If we want to process the data by another field, such as area, we can do so, but it will need to be shuffled first—and ksqlDB will do this automatically. This again illustrates the power of a declarative language, in which we describe what we want to do and not how to do it:

    SELECT area,
        AVG(reading) AS avg
    FROM readings
    GROUP BY area

In the slide above, you’re still taking an average of the readings, but this time you are applying GROUP BY area. In order to accomplish this, ksqlDB puts all of the rows for each key on the same partition in a repartitioned topic, then uses that topic to make your calculation.

Changelogs Examined

The purpose of the changelog is to provide redundancy in data storage and the ability to recover the state of a ksqlDB node if it were to fail. A replacement ksqlDB node rebuilds the state by reading from the changelog, which is durably held as a topic in Kafka. The state of each table is repopulated until the end of the changelog, at which point its latest state has been recovered.

Note that only the latest row by key is significant; as you replay, any earlier rows are overwritten by the later ones. ksqlDB optimizes this process using Kafka compacted topics, which periodically discard earlier records for each key and always retain the latest per key. So when you go to replay your changelogs, they are much shorter. This also means that the changelogs don’t grow indefinitely, since your data is proactively purged for you.

This means that in a real world scenario, you would have a sparse key space, with possibly one row per key. Each row would be played into the server as fast as possible:

This is how ksqlDB recovers quickly from a completely cold state. It’s playing only the data it needs to into its local stores.

Use the promo code KSQLDB101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.