Get Started Free
Tutorial

How to change a Kafka Streams topology and maintain compatibility

How to change a Kafka Streams topology and maintain compatibility

One of the primary conveniences of Kafka Streams is that it handles the internal state required to repartition data (e.g., from calling KStream.selectKey()) and implement stateful operations (e.g., from calling KGroupedStream.aggregate()).

Under the hood, these internal topics and state stores have names that, unless provided, are generated by Kafka Streams. Because the generated names include a numeric index based on the position in the topology, changes to the topology can break compatability. This happens because any previously created internal topics and state stores may get orphaned after the topology change.

Topic and state store generated names

Let's consider a concrete example:

    KStream<Long, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(longSerde, stringSerde))
            .selectKey((k, v) -> Long.parseLong(v.substring(0, 1)));

    KStream<Long, Long> countStream = inputStream.groupByKey().count().toStream();

Because there is a selectKey() operation that changes the input stream's key, the ensuing groupByKey() operation forces a repartition to an internal topic to make sure the modified keys end up on the correct partition.

Then, the count() aggregation causes Kafka Streams to create a state store and changelog topic (for fault tolerance).

If you were to call Topology.describe on this topology, the output would show you the following names for the state store, changelog, and repartition topic:

  • state store: <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002
  • changelog topic: <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
  • repartition topic: <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition

The 0000000002 substring in each name is the operation index mentioned above.

Now, let's update the topology by adding a filter:

    KStream<Long, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(longSerde, stringSerde))
            .selectKey((k, v) -> Long.parseLong(v.substring(0, 1)))
            .filter((k, v) -> k != 100L);

    KStream<Long, Long> countStream = inputStream.groupByKey().count().toStream();

Since the filter comes before the count() aggregation, the number in the state store and changelog / repartition topics will go from 0000000002 to 0000000003.

The upshot of this is that, when we run the application with the updated topology, the counts will start over from zero rather than start from where they left off.

Naming state stores and internal topics

How can we reuse state stores and interrnal topics in the face of topology changes? There are a few places in the Kafka Streams DSL where we can control the names of the internal topics and state stores:

  1. When grouping, use the Grouped.with method that lets you provide the repartition topic name.
  2. When aggregating, use the Materialized.as method that lets you provide the store name. Use the aggergation method that accepts such a Materialized instance (e.g., here is the count() method's).
  3. When joining, opt for a join method that accepts a StreamJoined configuration object. Provide the state store and repartition topic names by calling both withName() and withStoreName() when instantiating this object.
  4. Many Kafka Streams APIs accept a Named argument, e.g., this filter() method takes a Named object that is used to name the processor in the topology.

Running the example

The Kafka Streams example application included in this tutorial expects String input records. The selectKey() operation extracts the first character of each String and uses it as the record key. The application accepts flags that you can toggle for whether to add a filter() operation and whether to name state stores and internal topics. The following steps will demonstrate broken compatibility when you don't name state stores and internal topics vs. correct results when you do provide names.

  1. Start a consumer on the output-topic and let it run. Print the keys in the output by specifying --property print.key=true and --property key.separator="-"
  2. Run the application with add.filter=false and add.names=false.
  3. Produce records 1foo, 1bar, and 1baz to the input topic input-topic.
  4. Observe the expected counts by key:
    1-1
    1-2
    1-3
  5. Restart with application with add.filter=true and add.names=false.
  6. Again, produce records 1foo, 1bar, and 1baz to the input topic input-topic.
  7. Observe that, because new state stores and internal topics are used, our consumer shows the same counts as before instead of 4, 5, and 6:
    1-1
    1-2
    1-3

Now let's run through the same steps, except this time we will name state stores and internal topics. With the same consumer from the previous steps already running:

  1. Restart the application with add.filter=false and add.names=true.
  2. Produce records 2foo, 2bar, and 2baz to the input topic input-topic.
  3. Observe the expected counts by key:
    2-1
    2-2
    2-3
  4. Restart with application with add.filter=true and add.names=true.
  5. Again, produce records 2foo, 2bar, and 2baz to the input topic input-topic.
  6. Observe that, because we reuse the state stores and internal topics populated in the previous run, the counts pick up where they left off:
    2-4
    2-5
    2-6