Stateless operations in Kafka Streams are quite useful if you are just filtering or doing some other operation that doesn't need to collect data. In other scenarios, however, you may need to remember something about previous records. For example, you may want to know how many times a specific customer has logged in, or the total number of tickets sold.
In a stateful operation, you typically group by key first, so keys have to be present. If they're not, you can derive a key, but you will have to repartition. Repartitioning is basically the act of writing back out to a topic so that keys will end up on correct partitions. If you create a new key, chances are that it will not be on the partition where it belongs.
Stateful operations in Kafka Streams are backed by state stores. The default is a persistent state store implemented in RocksDB, but you can also use in-memory stores. State stores are backed up by a changelog topic, making state in Kafka Streams fault-tolerant. When you call a stateful operation, a KTable is returned (recall that in a table, new values overwrite previous ones). Stateful operations in Kafka Streams include reduce, count, and aggregate.
reduce, you take an interface of
Reducer, a Single Abstract Method that takes one value type as a parameter, and you apply an operation. Reduce expects you to return the same type. Here is a
reduce that sums:
StreamsBuilder builder = new StreamsBuilder(); KStream<String, Long> myStream = builder.stream("topic-A"); Reducer<Long> reducer = (longValueOne, longValueTwo) -> longValueOne + longValueTwo; myStream.groupByKey().reduce(reducer, Materialized.with(Serdes.String(), Serdes.Long())) .toStream().to("output-topic");
(Since it's a Single Abstract Method, you can use a lambda.)
You create your stream, then group by key (this assumes that your stream is correctly keyed). Then you call
reduce and pass in your
reducer. Notice also that you are providing SerDes for the store in
Materialized. There's a chance that you would have defined your SerDes up front, either via a
Consumed object or via configuration. So if you had defined the SerDes for the key as a string and the value as a long via your configuration, you wouldn't need to do this. But typically, it's best to provide SerDes here; it will be clearer when you go to look back at the code (RocksDB and in-memory stores don't store objects, but bytes). Notice also that
reduce returns a KTable, but the
to operator doesn't exist in the KTable API, so we have to convert to a stream: We're converting our records stream to an event stream, and writing it to an output topic.
Aggregate is a form of
reduce, but with
aggregate, you can return a different type:
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> myStream = builder.stream("topic-A"); Aggregator<String, String, Long> characterCountAgg = (key, value, charCount) -> value.length() + charCount; myStream.groupByKey().aggregate(() -> 0L, characterCountAgg, Materialized.with(Serdes.String(), Serdes.Long())) .toStream().to("output-topic");
In this example, our inputs are strings: a string key and a string value. With
aggregate in Kafka Streams, you provide an
initializer and an
initializer in this case adds a zero value as a long, and then we have
characterCountAgg, which basically takes the key and value and the previous count. You take the length of your string and add it to the previous count so that it builds up a running count. Here, it's critical that we provide the SerDes for the state store, because we're changing the type. (The type of our stream is string/string for the key/value, but for our store, it's going to be string/long.) Then we call
.toStream() and send to an output topic.
Stateful operations don't emit results immediately. Kafka Streams has an internal buffering mechanism that caches results. Two factors control when the cache emits records: Records are emitted when the cache is full (defined equally per instance among the number of stores; it's 10MB), and by default, Kafka Streams calls
commit every 30 seconds (you don't call
commit yourself). At this point, you would see an update. In order to see every update that comes through your aggregation, you can set your cache size to zero (which is also useful for debugging).
Even with caching, you will get multiple results, so for a single and final stateful result, you should use suppression overloads with
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.