Get Started Free
‹ Back to courses
course: Kafka Streams 101

Stateful Operations

8 min
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Stateful Operations

Stateless operations in Kafka Streams are quite useful if you are just filtering or doing some other operation that doesn't need to collect data. In other scenarios, however, you may need to remember something about previous records. For example, you may want to know how many times a specific customer has logged in, or the total number of tickets sold.

In a stateful operation, you typically group by key first, so keys have to be present. If they're not, you can derive a key, but you will have to repartition. Repartitioning is basically the act of writing back out to a topic so that keys will end up on correct partitions. If you create a new key, chances are that it will not be on the partition where it belongs.

Stateful operations in Kafka Streams are backed by state stores. The default is a persistent state store implemented in RocksDB, but you can also use in-memory stores. State stores are backed up by a changelog topic, making state in Kafka Streams fault-tolerant. When you call a stateful operation, a KTable is returned (recall that in a table, new values overwrite previous ones). Stateful operations in Kafka Streams include reduce, count, and aggregate.

The Stateful Operations

Reduce

With reduce, you take an interface of Reducer, a Single Abstract Method that takes one value type as a parameter, and you apply an operation. Reduce expects you to return the same type. Here is a reduce that sums:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> myStream = builder.stream("topic-A");
Reducer<Long> reducer = (longValueOne, longValueTwo) -> longValueOne + longValueTwo;
myStream.groupByKey().reduce(reducer,
                             Materialized.with(Serdes.String(), Serdes.Long()))
                            .toStream().to("output-topic");

(Since it's a Single Abstract Method, you can use a lambda.)

You create your stream, then group by key (this assumes that your stream is correctly keyed). Then you call reduce and pass in your reducer. Notice also that you are providing SerDes for the store in Materialized. There's a chance that you would have defined your SerDes up front, either via a Consumed object or via configuration. So if you had defined the SerDes for the key as a string and the value as a long via your configuration, you wouldn't need to do this. But typically, it's best to provide SerDes here; it will be clearer when you go to look back at the code (RocksDB and in-memory stores don't store objects, but bytes). Notice also that reduce returns a KTable, but the to operator doesn't exist in the KTable API, so we have to convert to a stream: We're converting our records stream to an event stream, and writing it to an output topic.

Aggregation

Aggregate is a form of reduce, but with aggregate, you can return a different type:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> myStream = builder.stream("topic-A");

Aggregator<String, String, Long> characterCountAgg = 
    (key, value, charCount) -> value.length() + charCount;
myStream.groupByKey().aggregate(() -> 0L, 
                                      characterCountAgg, 
                                      Materialized.with(Serdes.String(), Serdes.Long()))
                                      .toStream().to("output-topic");

In this example, our inputs are strings: a string key and a string value. With aggregate in Kafka Streams, you provide an initializer and an aggregator. Our initializer in this case adds a zero value as a long, and then we have characterCountAgg, which basically takes the key and value and the previous count. You take the length of your string and add it to the previous count so that it builds up a running count. Here, it's critical that we provide the SerDes for the state store, because we're changing the type. (The type of our stream is string/string for the key/value, but for our store, it's going to be string/long.) Then we call .toStream() and send to an output topic.

Considerations

Stateful operations don't emit results immediately. Kafka Streams has an internal buffering mechanism that caches results. Two factors control when the cache emits records: Records are emitted when the cache is full (defined equally per instance among the number of stores; it's 10MB), and by default, Kafka Streams calls commit every 30 seconds (you don't call commit yourself). At this point, you would see an update. In order to see every update that comes through your aggregation, you can set your cache size to zero (which is also useful for debugging).

Even with caching, you will get multiple results, so for a single and final stateful result, you should use suppression overloads with aggregate/reduce operations.

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Stateful Operations

Hi, I'm Sophie Blee-Goldman with Confluent. In this module, we'll talk about stateful operations. So far, we've discussed only stateless operations. Stateless operations are great for when you don't need to remember the previous state of the event stream. So for example, a filter, you just provide a predicate, this returns true or false, or a mapValues where you just define a transformation on value or a key. But, sometimes you might need to actually remember the state of this event stream. And by that we just mean, what was the previous value for this key in this event stream? So, this might be something like, how many times has a particular customer logged in to your browser session? What is the total sum of tickets sold? For cases like this, you really need to keep track of the state, and Kafka Streams offers stateful operations for you to do that. So for all stateful operations, you're required to first, group by the key. Under the covers, this is a somewhat more complicated operation where Kafka Streams will re-partition the data, which just means making sure that all of the keys, all of the events with the same key end up on the same partition. For Kafka Streams users, most of this goes on under the hood. So you don't really need to care about how to specify what's happening. You just need to tell Kafka Streams to group by key, before the stateful operations. So what are these stateful operations? We have things like count, which just counts how many times have we seen a key in this event stream until now. Things like reduce, which let you combine events in your stream into some other event. And an aggregate, which is just a superset of the reduced really, which allows you to aggregate data that you've seen for this topic until now into whatever type or whatever result you want. So what does this actually look like? The reduce operator, for example, will take values in a stream and combine them in some way such that the input type is preserved. So you can see here, we first start by creating our stream as usual, this time from topic A, and then we define a reducer. So the reducer again is a lambda. It takes in a value one and a value two, and defines how they get combined. So in this case, we just define a simple sum operation. It's important to note that value one and value two aren't necessarily any specific values. It could be that the first value is literally the first value in this event that we've seen so far. It could also be that value one represents the previous result produced by this reducer. Whereas value two is the latest event. In all those cases, they're still the same type. So for example if you're taking a sum like this, the sum operator is the same regardless of whether you're summing a previous sum with the new value, or just the first two values in the sum. So once you define that reducer, you're going to take a stream and call groupByKey , which as we said is important before any stateful operation, to make sure that everything is in its right place so that all keys are matched together. And then you can just call reduce. Reduce just takes the reducer and again, a materialized. So the materialized we've seen before, and it tells the Kafka Streams operator how to actually store your data in the state store. Your materialized state store, if you will. In this case, the key is a string and the value of this reducer is going to be a long. So we need to tell Kafka to use the Serdes string, and Long Serdes to actually store this data in the local state store, since it will be serialized and de-serialized from the bytes. And lastly, we might want to actually write these results somewhere so we can use them. So we call toStream on the result of this reduce, which is a KTable, and this turns it back into a KStream. So once we have our KStream again, we can call the to operator, and we will write the results to this output topic. Now, the aggregation, as we said, is really a superset of the reduce. Like the reduce, it lets you combine events within the same stream, but unlike the reduce it does not have this restriction that they have to be the same type. So in an aggregation, you basically can take any events with the same key and combine them however you like into a completely different type if you want. This means that you have to specify a few different things for the aggregation. As you'll see here, we have first specified the aggregator itself, the aggregator takes in the key, the value, and the current aggregation value. In this case, we are counting the number of characters that we've seen so far. So we are calling back the charCount and the aggregator itself is just going to add the value length to this current charCount to give us the total sum of characters that we've seen so far. So again, like the reduce, we start by calling groupByKey to make sure all of the events are grouped by key. And then we call aggregate. Now the first argument to the aggregate function is actually going to be the initializer. And the initializer just tells the aggregate how to start the value. What should the initial character count value be? In this case, you probably wouldn't start counting from some random number. So you have to tell streams to start counting from zero. Then after that you pass in just this character count aggregator, and again a materialized, and then you have your output KTable. So you can write it to the output topic so that you can actually do something with these results. So those are just some of the stateful operations that Kafka Streams provides. There are some considerations that you should think of when you are using a stateful operation however. Most importantly that these stateful operations don't always emit results immediately. There's actually an internal caching layer that will buffer these results, these updates, and only forward them when they are evicted from this cache. So there's a few factors that control how this cache emission works. There is the size, which is just a config that you would set. It's 10 megabytes by default. And there is the commit interval. So every commit interval, which is 30 seconds by default, Kafka Streams will flush all of the caches and do some other things, such as forwarding the records to the producer itself. And this will cause all of the current buffering records to be fully sent out to their output topic in Kafka. So that means, if you're not seeing all the updates that you expect to, or they're taking a long time to get to you, you might want to try setting the cache size to zero, and possibly also the commit interval. This is useful for debugging. This is useful for just general development and for testing. It really helps to see all the updates and to do so, you'll have to fiddle with these configs a bit. So, now that you know a little bit about stateful operations in Kafka streams, let's go over them in this next exercise.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.