Get Started Free
‹ Back to courses
course: Inside ksqlDB

How Stateful Operations Work

6 min
Michael Drogalis

Michael Drogalis

Principal Product Manager (Presenter)

How Stateful Operations Work

So far, you have learned about processing streams of data in a stateless way, where ksqlDB does not retain calculations or other information as it processes each row. Now consider what happens within ksqlDB when state becomes part of the equation.

Materialized Views

In a traditional relational database, a materialized view speeds up queries by maintaining precalculated results of queries against an underlying table. These calculations could be aggregations such as SUM or COUNT, for example. ksqlDB has similar capabilities and can maintain calculations against rows arriving in a stream. These are stateful and are stored in a ksqlDB table.

Here is an example of building a materialized view in ksqlDB. It holds the average value of reading for each sensor:

CREATE TABLE avg_readings AS
    SELECT sensor, 
         AVG(reading) as avg 
    FROM readings
    GROUP BY sensor
    EMIT CHANGES;

As new reading rows arrive on the source stream readings, the avg_readings table is updated accordingly. The table can be queried to find the current average reading for a sensor (as you would query a relational database table). You can also subscribe to a stream of updates to the table using a push query, which is discussed below.

ksqlDB doesn’t recalculate a new average each time from scratch but rather factors into the incremental difference using the new data point. This is write-time amortization, and it’s why queries against ksqlDB’s materialized tables are so fast.

The materialization is stored locally on the ksqlDB server using RocksDB, an embedded key-value store that transparently runs in process and extends fast access to a copy of the data.

ksqlDB creates an audit log of all changes to the materialized view and stores this changelog as an Apache Kafka® topic, which means it's highly durable and can be replayed to restore the state if needed.

Push and Pull Queries

ksqlDB has two options for querying data: pull queries and push queries.

Pull queries, which are run against materialized views, are similar to traditional database queries in which you query your data and then get a response. They are a process that starts, performs an operation, and then terminates. When you run a pull query at a particular moment, you get the current state of the table (the materialized view) at that specific point in time.

Push queries, on the other hand, let you subscribe to a stream of updates, and they stay alive until you stop them. So if you run a push query against a table (a materialized view), you receive an update when the state changes. In the example above, this would be every time the average reading for a sensor changes.

Pull and push queries each have distinct uses, and you pick the one to use based on your application’s requirements.

Automatic Repartitioning

In the context of distributed data processing, data shuffling is necessary to collect all of your data into one place so that you can query it. ksqlDB does this automatically with a repartitioning topic, which is internal to ksqlDB.

Our original source stream, readings, is partitioned by sensor. If we want to process the data by another field, such as area, we can do so, but it will need to be shuffled first—and ksqlDB will do this automatically. This again illustrates the power of a declarative language, in which we describe what we want to do and not how to do it:

CREATE TABLE part_avg AS
    SELECT area,
        AVG(reading) AS avg
    FROM readings
    GROUP BY area
    EMIT CHANGES;

In the slide above, you’re still taking an average of the readings, but this time you are applying GROUP BY area. In order to accomplish this, ksqlDB puts all of the rows for each key on the same partition in a repartitioned topic, then uses that topic to make your calculation.

Changelogs Examined

The purpose of the changelog is to provide redundancy in data storage and the ability to recover the state of a ksqlDB node if it were to fail. A replacement ksqlDB node rebuilds the state by reading from the changelog, which is durably held as a topic in Kafka. The state of each table is repopulated until the end of the changelog, at which point its latest state has been recovered.

Note that only the latest row by key is significant; as you replay, any earlier rows are overwritten by the later ones. ksqlDB optimizes this process using Kafka compacted topics, which periodically discard earlier records for each key and always retain the latest per key. So when you go to replay your changelogs, they are much shorter. This also means that the changelogs don’t grow indefinitely, since your data is proactively purged for you.

This means that in a real world scenario, you would have a sparse key space, with possibly one row per key. Each row would be played into the server as fast as possible:

This is how ksqlDB recovers quickly from a completely cold state. It’s playing only the data it needs to into its local stores.

Use the promo code KSQLDB101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

How Stateful Operations Work

Hi, I'm Michael Drogalis with Confluent. And in this module, we're gonna be looking at how stateful operations work in ksqlDB. In the last module, we looked at stateless operations where ksqlDB was just changing data in motion, but it wasn't remembering anything as it went. Things get a bit more interesting under the hood when operations require state. The first thing we're gonna look at is materializing a view from a stream. You can see that this stream is similar to the one that we created in the previous module. But this time we're gonna do something different. We're going to create a table called average readings. It does what it sounds like. It will select each sensor and take the average of its individual readings. It's going to materialize a view so that I can query it. I can ask questions about it, just like a regular database table. I'll slowly step through what's going on here. I have a few rows. The first one that comes through is from sensor one with a reading of 45, the server reads that row and commits it to the table. And the average is 45. The average of 45 is of course 45. The next few rows do the same, sensors four, two and three. These are all the initial rows for each key, but now I get a row for a key that I've already seen. Here is sensor three, with a reading of 52. These materialized views are incrementally adjusted. This is what makes them so powerful. ksqlDB doesn't have to take the average of all the numbers again, it just takes the last average and incrementally factors in the new data. This is what makes queries against this table so fast. It's what you'd call right time amortized. It's doing the smallest possible thing to make the table correct as it receives new data. It's the average of 52 and 67 is now 59.5. I can see how this plays out with the rest of the rows. Now this materialization is actually stored locally on ksqlDB servers. This uses RocksDB, an embedded key value store that transparently runs in process. This is what gives you a copy of fast access to the data. But you'll notice here on the right that I have a stream called changelog, what's going on here? ksqlDB is creating an audit trail of all changes that have occurred to the materialized view. I can see that I have sensor one that was updated to 45 and then to 68.5. Like any other stream changelogs are topics and they're stored durably in Kafka. We'll talk a little bit about why these changelogs are useful for fault tolerance, but from a programmer perspective, this is the backbone of ksqlDB's query layer. ksqlDB gives you two ways to query your data, push and pull queries. Pull queries are what allow your application to run a typical database query against your table. You can ask questions about your data and get an immediate response. If I ran a poll query and then asked for the average of say sensor four right now, I would get 75. But if I ran a push query, which is a streaming query, it would allow my application to subscribe to the changes for any given sensor. So if I ran a push query subscribing to say sensor three, I would get several results. I would get 67, 59.5 and 52. I would get three results. Now something that's tricky about distributed data processing is something called data shuffling. To be able to aggregate the data that you're interested in you have to get it all to the same place so that you can combine it together. That is why partitioning in Kafka is so useful. When I take an average of sensor readings, I need to take all the data for each sensor and get it into the same place so that I can reduce it into one number. I have a slightly different query here. I'm still taking the average of the readings, but I'm grouping it by area. Because my original data isn't partitioned by area I need to shuffle my data around. ksqlDB does this automatically with something called a repartition topic. This is invisible and it's internal to ksqlDB and happens on your behalf. Now I have a table with three rows wheel, motor and engine. What it's done is that this first stage of the query has put all of the rows for each key onto the same partition. ksqlDB will inject these repartition topics into your programs execution, whenever shuffle is required. So what's up with these changelogs that we looked at earlier? They're useful for push queries, but what else do they do? Imagine what happens when you lose a ksqlDB server. Suppose that the disk is lost with it. I mentioned that the materialization for a table is kept locally on the disc with RocksDB. That means that it's effectively gone and can't be used anymore. When that happens, a ksqlDB server needs to take the old notes place and recover the materialization. It's able to do this by reading the changelog topics out of Kafka to restore the state. Here are all the changes that we saw in the previous table, engine had an average of 67 and then one of 40 and then one of 45, and so on. The table is repopulated until it reaches the end of the changelog. At that point, the latest state of the table has been recovered. You might notice that only the latest row per key matters, any earlier rows are overwritten by the later ones. Anything earlier is a stale piece of information from the perspective of recovery. ksqlDB is able to optimize this away with compacted topics and Kafka. Compacted topics discard all records, except the latest per key. This means that you won't grow your changelog indefinitely. It is proactively purging the data for you. In a real world scenario, you would see something more like this during recovery. You'd have a sparse key space with maybe one row per key, and each row would be played into the server as fast as possible. This is what allows ksqlDB to have fast recovery from a completely cold state. It's playing only the data it needs to into its local stores.