How can you change the topology of an existing Kafka Streams application while retaining compatibility with the existing one?
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir naming-changelog-repartition-topics && cd naming-changelog-repartition-topics
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN
And launch it by running:
docker compose up -d
Create the following Gradle build file, named build.gradle
for the project:
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
}
}
plugins {
id "java"
id "idea"
id "eclipse"
id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"
repositories {
mavenCentral()
gradlePluginPortal()
maven {
url "https://packages.confluent.io/maven"
}
}
apply plugin: "com.github.johnrengelman.shadow"
dependencies {
implementation "org.apache.avro:avro:1.11.1"
implementation "org.slf4j:slf4j-simple:2.0.7"
implementation 'org.apache.kafka:kafka-streams:3.4.0'
implementation ('org.apache.kafka:kafka-clients') {
version {
strictly '3.4.0'
}
}
implementation "io.confluent:kafka-streams-avro-serde:7.3.0"
testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
testImplementation "junit:junit:4.13.2"
testImplementation 'org.hamcrest:hamcrest:2.2'
}
test {
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
exceptionFormat = "full"
}
}
jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
"Main-Class": "io.confluent.developer.NamingChangelogAndRepartitionTopics"
)
}
}
shadowJar {
archiveBaseName = "naming-changelog-repartition-topics-standalone"
archiveClassifier = ''
}
And be sure to run the following command to obtain the Gradle wrapper:
gradle wrapper
Next, create a directory for configuration data:
mkdir configuration
Then create a development file at configuration/dev.properties
:
application.id=naming-changelog-repartition-topics
bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081
input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1
output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1
join.topic.name=join-topic
join.topic.partitions=1
join.topic.replication.factor=1
Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
The point of this tutorial is to discuss the importance of naming state stores (hence changelog
topics) and repartition topics. In addition to having a more readable topology description, you can make your Kafka Streams application more robust to topology changes.
Lets look at the core logic of the Kafka Streams application:
KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
.selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)));
KStream<Long, Long> countStream = inputStream.groupByKey().count().toStream();
KStream<Long, String> joinedStream = inputStream.join(countStream, (v1, v2) -> v1.getName() + v2.toString(),
JoinWindows.of(Duration.ofMillis(100)),
StreamJoined.with(longSerde, exampleSerde, longSerde));
In the inputStream
there is a selectKey()
operation, changing the key of the incoming stream.
As a result, executing the inputStream.groupByKey()
operation forces a repartition to make sure the modified keys end up on the correct partition.
Additionally, count()
is an aggregation, so Kafka Streams creates a state store plus a changelog topic for fault-tolerance of the state store.
There are additional state stores and another repartition topic in this topology, but we’ll focus on the countStream
to keep things simple. The same principles apply to any state store, changelog and repartition topic.
When using the DSL, Kafka Streams generates the names for each processor, state store, and any required internal topics. To view a textual representation of your topology, you can run Topology.describe()
.
We won’t show the full output here, but describing this topology indicates the following names for the state store, changelog and repartition topic:
state store - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002
changelog topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
repartition topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
Here’s an image of the relevant part of the topology (view a full image of the topology
)
(Note that changelog topics don’t show up in Topology.describe()
)
You’ll notice the number 0000000002
at the end of the names. Kafka Streams appends an incrementing number as part of the name for each part of the topology. Here the state store, changelog topic, and repartition topic share the same number, since by default, they reuse the name of the corresponding state store.
Now go ahead and create the following file at src/main/java/io/confluent/developer/NamingChangelogAndRepartitionTopics.java
.
package io.confluent.developer;
import io.confluent.common.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class NamingChangelogAndRepartitionTopics {
public Properties buildStreamsProperties(Properties envProps) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
return props;
}
public Topology buildTopology(Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = envProps.getProperty("input.topic.name");
final String outputTopic = envProps.getProperty("output.topic.name");
final String joinTopic = envProps.getProperty("join.topic.name");
final Serde<Long> longSerde = Serdes.Long();
final Serde<String> stringSerde = Serdes.String();
final boolean addFilter = Boolean.parseBoolean(envProps.getProperty("add.filter"));
final boolean addNames = Boolean.parseBoolean(envProps.getProperty("add.names"));
KStream<Long, String> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, stringSerde))
.selectKey((k, v) -> Long.parseLong(v.substring(0, 1)));
if (addFilter) {
inputStream = inputStream.filter((k, v) -> k != 100L);
}
final KStream<Long, String> joinedStream;
final KStream<Long, Long> countStream;
if (!addNames) {
countStream = inputStream.groupByKey(Grouped.with(longSerde, stringSerde))
.count()
.toStream();
joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)),
StreamJoined.with(longSerde, stringSerde, longSerde));
} else {
countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
.count(Materialized.as("the-counting-store"))
.toStream();
joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)),
StreamJoined.with(longSerde, stringSerde, longSerde)
.withName("join").withStoreName("the-join-store"));
}
joinedStream.to(joinTopic, Produced.with(longSerde, stringSerde));
countStream.map((k,v) -> KeyValue.pair(k.toString(), v.toString())).to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
public void createTopics(final Properties envProps) {
final Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
try (final AdminClient client = AdminClient.create(config)) {
final List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
envProps.getProperty("input.topic.name"),
Integer.parseInt(envProps.getProperty("input.topic.partitions")),
Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
envProps.getProperty("output.topic.name"),
Integer.parseInt(envProps.getProperty("output.topic.partitions")),
Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));
topics.add(new NewTopic(
envProps.getProperty("join.topic.name"),
Integer.parseInt(envProps.getProperty("join.topic.partitions")),
Short.parseShort(envProps.getProperty("join.topic.replication.factor"))));
client.createTopics(topics);
}
}
public Properties loadEnvProperties(String fileName) throws IOException {
final Properties envProps = new Properties();
final FileInputStream input = new FileInputStream(fileName);
envProps.load(input);
input.close();
return envProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException(
"This program takes one argument: the path to an environment configuration file.");
}
final NamingChangelogAndRepartitionTopics instance = new NamingChangelogAndRepartitionTopics();
final Properties envProps = instance.loadEnvProperties(args[0]);
if (args.length > 1 ) {
final String namesAndFilter = args[1];
if (namesAndFilter.contains("filter")) {
envProps.put("add.filter", "true");
}
if (namesAndFilter.contains("names")) {
envProps.put("add.names", "true");
}
}
final CountDownLatch latch = new CountDownLatch(1);
final Properties streamProps = instance.buildStreamsProperties(envProps);
final Topology topology = instance.buildTopology(envProps);
instance.createTopics(envProps);
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
In your terminal, run:
./gradlew shadowJar
Now that you have an uberjar for the Kafka Streams application, you can launch it locally. In most tutorials, the Kafka Streams application runs until you shut it down. In this tutorial you’ll make changes to the topology which requires you to restart the streams application.
java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties
Open a new terminal and start the console-producer
docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092
Then copy-paste the following records to send. The data is formatted this way because the Kafka Streams application will create a key from the first character.
1foo
1bar
1baz
After you’ve sent the records, you can shut down the console-producer with Ctrl-C
.
Stop the console-producer with Ctrl-C
, and start the console-consumer. Take note you’re running the consumer with the --from-beginning
option so you’ll get all messages sent to the output
topic.
docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator="-" \
--max-messages 12
From your first run you should see the following output:
1-1
1-2
1-3
Note that even though this is the output of an aggregation operation, this tutorial configured the streams application to use StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG = 0
, so you’ll see every update from the count()
operation.
Since the streams application takes the first character to use as the key, the output of 1-1
, 1-2
, and 1-3
(key-count) is expected.
You can close this consumer for now with Ctrl-C
.
As you may have guessed, adding or subtracting a processor in your topology will change the name of all processors downstream of your change.
With that in mind, let’s add a filter()
processor to the inputStream
:
KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
.selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)))
.filter((k, v) -> k != 1L);
To make things a little easier, the code for the tutorial is configured to use feature flags. You’ll use the feature flags by passing different parameters in the command to start the Kafka Streams application.
Since you’ve added the filter()
before the aggregation, the name of the state store, changelog topic, and repartition topic will change.
In the new names, the number suffix will go from 0000000002
to 0000000003
like so:
state store - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003
changelog topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog
repartition topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition
Here’s an image of the relevant part of the updated topology (view a full image of the topology
)
(Note that changelog topics don’t show up in Topology.describe()
)
Now, in the terminal running the streams application, use Ctrl-C
, then restart the app with the following command:
java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties filter-only
Go back to the producer/consumer terminal and start the console-producer again.
docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092
We’ll send the exact same data from before, as we want to update the counts for existing records. Copy and paste the following into the prompt and press enter:
1foo
1bar
1baz
After you’ve sent the records, you can shut down the console-producer with Ctrl-C
.
Now, restart the console-consumer. Remember the consumer is running with the --from-beginning
option so you’ll get all messages sent to the output-topic
topic.
docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator="-" \
--max-messages 12
In this second run, you should see this output:
1-1
1-2
1-3
1-1
1-2
1-3
Even though the Kafka Streams application counts by key and you sent the same keys, the output is repeated. The application produced the first 3 records in the previous run. So why is the output 1-1, 1-2, 1-3
instead of 1-4, 1-5, 1-6
? Adding the new operation incremented the counter used to generate the names of every processor, state store, and internal topic downstream of the new operator.
This renaming means the streams application count()
processor now uses a new state store, vs. the one created when you first started the application. The situation is the same if you used an in-memory store as the name of the changelog topic. When the name changes, there is nothing to restore once streams builds the in-memory store.
Your original data is still there, but Kafka Streams isn’t using the previously created state store and changelog topic.
You can close this consumer for now with Ctrl-C
.
Now let’s start over. This time you’ll update the topology and provide names to all stateful operators.
countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
.count(Materialized.as("the-counting-store"))
.toStream();
joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
JoinWindows.of(Duration.ofMillis(100)),
StreamJoined.with(longSerde, stringSerde, longSerde)
.withName("join").withStoreName("the-join-store"));
Here’s an image of the relevant part of the topology now with names (view a full image of the topology
):
(Note that changelog topics don’t show up in Topology.describe()
)
Just like you’ve done throughout the tutorial, the changes are made by using feature flags which are enabled by parameters you pass to start the application.
In the terminal running the streams application, use Ctrl-C
, then restart the streams application with this command:
java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties names-only
Back in your producer/consumer terminal, start the console-producer again.
docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092
Then copy-paste the following records to send. You’ll use data resulting in different keys this time.
2foo
2bar
2baz
After you’ve sent the records, you can shut down the console-producer with Ctrl-C
.
Now let’s start up the console consumer again:
docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator="-" \
--max-messages 12
For this run you should see all six records plus three new records:
1-1
1-2
1-3
1-1
1-2
1-3
2-1
2-2
2-3
Since you used data resulting in new keys, 2-1, 2-2, 2-3
looks correct.
You can close this consumer for now with Ctrl-C
.
Now let’s add a new operator (filter()
) to the named topology:
KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
.selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)))
.filter((k, v) -> k != 1L);
But this time, adding a new processor won’t change the name of the stateful parts of your application, as you’ve explicitly named them in the previous step.
Here’s an image of the relevant part of the updated topology with stateful operators are named (view a full image of the topology
):
(Note that changelog topics don’t show up in Topology.describe()
)
You’ll notice the other processor names have shifted, but since these are stateless, that’s ok and it won’t break topology compatibility.
In the terminal running the streams application, use Ctrl-C
, then restart the streams application with this command:
java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties filter-with-names
One last time, from your producer/consumer terminal, start a console-producer
docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092
Then copy-paste the following records to send. Again you’re sending updates with the same keys.
2foo
2bar
2baz
After you’ve sent the records, you can shut down the console-producer with Ctrl-C
.
Finally, start a console-consumer.
docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator="-" \
--max-messages 12
You should see the following output:
1-1
1-2
1-3
1-1
1-2
1-3
2-1
2-2
2-3
2-4
2-5
2-6
The last three records, 2-4, 2-5, 2-6
, show the correct output, as you produced six records with the same key.
You have updated your topology and reused the existing state stores and internal topics!
Always name stateful operators
If you haven’t named your stateful operators and you need to update your topology, use the Application Reset Tool
to reprocess records.
Aggregation repartition topics (if needed)
Use the Grouped.as()
method
Use Grouped
to provide repartition Serdes
as well if required
Kafka Streams appends the text -repartition
to the provided name
If no name is provided, the state store name is used with -repartition
appended
Joins
Use the StreamJoined
configuration object
StreamJoined.name()
names the join processors and provides the base-name of the repartition topic (if needed)
StreamJoined.withStoreName()
is used to name the state stores associated with the join.
If you need to provide Serdes
you’ll use StreamJoined
as well
State Stores
Use Materialized.as()
method
Use Materialized
for state store Serdes
if needed
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.
Sign up for Confluent Cloud, a fully managed Apache Kafka service.
After you log in to Confluent Cloud Console, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
Next, from the Confluent Cloud Console, click on Clients
to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.