One of the primary conveniences of Kafka Streams is that it handles the internal state required to repartition data (e.g., from calling KStream.selectKey()) and implement stateful operations (e.g., from calling KGroupedStream.aggregate()).
Under the hood, these internal topics and state stores have names that, unless provided, are generated by Kafka Streams. Because the generated names include a numeric index based on the position in the topology, changes to the topology can break compatability. This happens because any previously created internal topics and state stores may get orphaned after the topology change.
Let's consider a concrete example:
KStream<Long, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(longSerde, stringSerde))
.selectKey((k, v) -> Long.parseLong(v.substring(0, 1)));
KStream<Long, Long> countStream = inputStream.groupByKey().count().toStream();
Because there is a selectKey() operation that changes the input stream's key, the ensuing groupByKey() operation forces a repartition to an internal topic to make sure the modified keys end up on the correct partition.
Then, the count() aggregation causes Kafka Streams to create a state store and changelog topic (for fault tolerance).
If you were to call Topology.describe on this topology, the output would show you the following names for the state store, changelog, and repartition topic:
The 0000000002 substring in each name is the operation index mentioned above.
Now, let's update the topology by adding a filter:
KStream<Long, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(longSerde, stringSerde))
.selectKey((k, v) -> Long.parseLong(v.substring(0, 1)))
.filter((k, v) -> k != 100L);
KStream<Long, Long> countStream = inputStream.groupByKey().count().toStream();
Since the filter comes before the count() aggregation, the number in the state store and changelog / repartition topics will go from 0000000002 to 0000000003.
The upshot of this is that, when we run the application with the updated topology, the counts will start over from zero rather than start from where they left off.
How can we reuse state stores and interrnal topics in the face of topology changes? There are a few places in the Kafka Streams DSL where we can control the names of the internal topics and state stores:
The Kafka Streams example application included in this tutorial expects String input records. The selectKey() operation extracts the first character of each String and uses it as the record key. The application accepts flags that you can toggle for whether to add a filter() operation and whether to name state stores and internal topics. The following steps will demonstrate broken compatibility when you don't name state stores and internal topics vs. correct results when you do provide names.
1-1
1-2
1-3
1-1
1-2
1-3
Now let's run through the same steps, except this time we will name state stores and internal topics. With the same consumer from the previous steps already running:
2-1
2-2
2-3
2-4
2-5
2-6