How to name stateful operations in Kafka Streams

Question:

How can you change the topology of an existing Kafka Streams application while retaining compatibility with the existing one?

Edit this page

Example use case:

You want to add or remove some operations in your Kafka Streams application. In this tutorial, we'll name the changelog and repartition topics so that the topology updates don't break compatibility.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir naming-changelog-repartition-topics && cd naming-changelog-repartition-topics

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
    - broker
    ports:
    - 8081:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

And launch it by running:

docker compose up -d

Configure the project

4

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
    id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()
    gradlePluginPortal()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.apache.avro:avro:1.11.1"
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation 'org.apache.kafka:kafka-streams:3.4.0'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.4.0'
        }
      }
    implementation "io.confluent:kafka-streams-avro-serde:7.3.0"
    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.NamingChangelogAndRepartitionTopics"
    )
  }
}

shadowJar {
  archiveBaseName = "naming-changelog-repartition-topics-standalone"
  archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=naming-changelog-repartition-topics
bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

join.topic.name=join-topic
join.topic.partitions=1
join.topic.replication.factor=1

Create an initial Kafka Streams topology

5

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

The point of this tutorial is to discuss the importance of naming state stores (hence changelog topics) and repartition topics. In addition to having a more readable topology description, you can make your Kafka Streams application more robust to topology changes.

Lets look at the core logic of the Kafka Streams application:


  KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)));

  KStream<Long, Long> countStream = inputStream.groupByKey().count().toStream();

  KStream<Long, String> joinedStream = inputStream.join(countStream, (v1, v2) -> v1.getName() + v2.toString(),
                                                              JoinWindows.of(Duration.ofMillis(100)),
                                                              StreamJoined.with(longSerde, exampleSerde, longSerde));

In the inputStream there is a selectKey() operation, changing the key of the incoming stream.

As a result, executing the inputStream.groupByKey() operation forces a repartition to make sure the modified keys end up on the correct partition.

Additionally, count() is an aggregation, so Kafka Streams creates a state store plus a changelog topic for fault-tolerance of the state store.

There are additional state stores and another repartition topic in this topology, but we’ll focus on the countStream to keep things simple. The same principles apply to any state store, changelog and repartition topic.

When using the DSL, Kafka Streams generates the names for each processor, state store, and any required internal topics. To view a textual representation of your topology, you can run Topology.describe().

We won’t show the full output here, but describing this topology indicates the following names for the state store, changelog and repartition topic:

  • state store - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002

  • changelog topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog

  • repartition topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition

Here’s an image of the relevant part of the topology (view a full image of the topology)


Kafka Streams Sub-topology

(Note that changelog topics don’t show up in Topology.describe())

You’ll notice the number 0000000002 at the end of the names. Kafka Streams appends an incrementing number as part of the name for each part of the topology. Here the state store, changelog topic, and repartition topic share the same number, since by default, they reuse the name of the corresponding state store.

Now go ahead and create the following file at src/main/java/io/confluent/developer/NamingChangelogAndRepartitionTopics.java.

package io.confluent.developer;


import io.confluent.common.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class NamingChangelogAndRepartitionTopics {


  public Properties buildStreamsProperties(Properties envProps) {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
    props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);

    return props;
  }

  public Topology buildTopology(Properties envProps) {
    final StreamsBuilder builder = new StreamsBuilder();
    final String inputTopic = envProps.getProperty("input.topic.name");
    final String outputTopic = envProps.getProperty("output.topic.name");
    final String joinTopic = envProps.getProperty("join.topic.name");

    final Serde<Long> longSerde = Serdes.Long();
    final Serde<String> stringSerde = Serdes.String();

    final boolean addFilter = Boolean.parseBoolean(envProps.getProperty("add.filter"));
    final boolean addNames = Boolean.parseBoolean(envProps.getProperty("add.names"));

    KStream<Long, String> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, stringSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.substring(0, 1)));
    if (addFilter) {
      inputStream = inputStream.filter((k, v) -> k != 100L);
    }

    final KStream<Long, String> joinedStream;
    final KStream<Long, Long> countStream;

    if (!addNames) {
         countStream = inputStream.groupByKey(Grouped.with(longSerde, stringSerde))
                                    .count()
                                    .toStream();

        joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                                              JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)),
                                                              StreamJoined.with(longSerde, stringSerde, longSerde));
    } else {
        countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
                                   .count(Materialized.as("the-counting-store"))
                                   .toStream();

        joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                                              JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)),
                                                              StreamJoined.with(longSerde, stringSerde, longSerde)
                                                                          .withName("join").withStoreName("the-join-store"));
    }

    joinedStream.to(joinTopic, Produced.with(longSerde, stringSerde));
    countStream.map((k,v) -> KeyValue.pair(k.toString(), v.toString())).to(outputTopic, Produced.with(stringSerde, stringSerde));


    return builder.build();
  }

  public void createTopics(final Properties envProps) {
    final Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
    try (final AdminClient client = AdminClient.create(config)) {

      final List<NewTopic> topics = new ArrayList<>();

      topics.add(new NewTopic(
          envProps.getProperty("input.topic.name"),
          Integer.parseInt(envProps.getProperty("input.topic.partitions")),
          Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

      topics.add(new NewTopic(
          envProps.getProperty("output.topic.name"),
          Integer.parseInt(envProps.getProperty("output.topic.partitions")),
          Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));

      topics.add(new NewTopic(
          envProps.getProperty("join.topic.name"),
          Integer.parseInt(envProps.getProperty("join.topic.partitions")),
          Short.parseShort(envProps.getProperty("join.topic.replication.factor"))));

      client.createTopics(topics);
    }
  }

  public Properties loadEnvProperties(String fileName) throws IOException {
    final Properties envProps = new Properties();
    final FileInputStream input = new FileInputStream(fileName);
    envProps.load(input);
    input.close();

    return envProps;
  }

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      throw new IllegalArgumentException(
          "This program takes one argument: the path to an environment configuration file.");
    }

    final NamingChangelogAndRepartitionTopics instance = new NamingChangelogAndRepartitionTopics();
    final Properties envProps = instance.loadEnvProperties(args[0]);
    if (args.length > 1 ) {
      final String namesAndFilter = args[1];

      if (namesAndFilter.contains("filter")) {
        envProps.put("add.filter", "true");
      }

      if (namesAndFilter.contains("names")) {
        envProps.put("add.names", "true");
      }
    }

    final CountDownLatch latch = new CountDownLatch(1);
    final Properties streamProps = instance.buildStreamsProperties(envProps);
    final Topology topology = instance.buildTopology(envProps);

    instance.createTopics(envProps);

    final KafkaStreams streams = new KafkaStreams(topology, streamProps);

    // Attach shutdown handler to catch Control-C.
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
      @Override
      public void run() {
        streams.close(Duration.ofSeconds(5));
        latch.countDown();
      }
    });

    try {
      streams.start();
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }
    System.exit(0);
  }

}

Compile and run the Kafka Streams program

6

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. In most tutorials, the Kafka Streams application runs until you shut it down. In this tutorial you’ll make changes to the topology which requires you to restart the streams application.


java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties

Produce sample data to the input topic

7

Open a new terminal and start the console-producer

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092

Then copy-paste the following records to send. The data is formatted this way because the Kafka Streams application will create a key from the first character.

1foo
1bar
1baz

After you’ve sent the records, you can shut down the console-producer with Ctrl-C.

Consume data from the output topic

8

Stop the console-producer with Ctrl-C, and start the console-consumer. Take note you’re running the consumer with the --from-beginning option so you’ll get all messages sent to the output topic.

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-" \
 --max-messages 12

From your first run you should see the following output:

1-1
1-2
1-3

Note that even though this is the output of an aggregation operation, this tutorial configured the streams application to use StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG = 0, so you’ll see every update from the count() operation.

Since the streams application takes the first character to use as the key, the output of 1-1, 1-2, and 1-3 (key-count) is expected.

You can close this consumer for now with Ctrl-C.

Add an operation to the topology

9

As you may have guessed, adding or subtracting a processor in your topology will change the name of all processors downstream of your change.

With that in mind, let’s add a filter() processor to the inputStream:


  KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)))
                                                  .filter((k, v) -> k != 1L);

To make things a little easier, the code for the tutorial is configured to use feature flags. You’ll use the feature flags by passing different parameters in the command to start the Kafka Streams application.

Since you’ve added the filter() before the aggregation, the name of the state store, changelog topic, and repartition topic will change.

In the new names, the number suffix will go from 0000000002 to 0000000003 like so:

  • state store - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003

  • changelog topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog

  • repartition topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition

Here’s an image of the relevant part of the updated topology (view a full image of the topology)


Kafka Streams Sub-topology

(Note that changelog topics don’t show up in Topology.describe())

Now, in the terminal running the streams application, use Ctrl-C, then restart the app with the following command:

java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties filter-only

Produce some records to the updated topology

10

Go back to the producer/consumer terminal and start the console-producer again.

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092

We’ll send the exact same data from before, as we want to update the counts for existing records. Copy and paste the following into the prompt and press enter:

1foo
1bar
1baz

After you’ve sent the records, you can shut down the console-producer with Ctrl-C.

Consume the updated records

11

Now, restart the console-consumer. Remember the consumer is running with the --from-beginning option so you’ll get all messages sent to the output-topic topic.

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-" \
 --max-messages 12

In this second run, you should see this output:

1-1
1-2
1-3
1-1
1-2
1-3

Even though the Kafka Streams application counts by key and you sent the same keys, the output is repeated. The application produced the first 3 records in the previous run. So why is the output 1-1, 1-2, 1-3 instead of 1-4, 1-5, 1-6? Adding the new operation incremented the counter used to generate the names of every processor, state store, and internal topic downstream of the new operator.

This renaming means the streams application count() processor now uses a new state store, vs. the one created when you first started the application. The situation is the same if you used an in-memory store as the name of the changelog topic. When the name changes, there is nothing to restore once streams builds the in-memory store.

Your original data is still there, but Kafka Streams isn’t using the previously created state store and changelog topic.

You can close this consumer for now with Ctrl-C.

Add names to the stateful operators of the topology

12

Now let’s start over. This time you’ll update the topology and provide names to all stateful operators.


 countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
                                   .count(Materialized.as("the-counting-store"))
                                   .toStream();

 joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                 JoinWindows.of(Duration.ofMillis(100)),
                                 StreamJoined.with(longSerde, stringSerde, longSerde)
                                             .withName("join").withStoreName("the-join-store"));

Here’s an image of the relevant part of the topology now with names (view a full image of the topology):


Kafka Streams Sub-topology

(Note that changelog topics don’t show up in Topology.describe())

Just like you’ve done throughout the tutorial, the changes are made by using feature flags which are enabled by parameters you pass to start the application.

In the terminal running the streams application, use Ctrl-C, then restart the streams application with this command:

java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties names-only
	

Produce records to the named topology

13

Back in your producer/consumer terminal, start the console-producer again.

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092

Then copy-paste the following records to send. You’ll use data resulting in different keys this time.

2foo
2bar
2baz

After you’ve sent the records, you can shut down the console-producer with Ctrl-C.

Consume records from the named topology

14

Now let’s start up the console consumer again:

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-" \
 --max-messages 12

For this run you should see all six records plus three new records:

1-1
1-2
1-3
1-1
1-2
1-3
2-1
2-2
2-3

Since you used data resulting in new keys, 2-1, 2-2, 2-3 looks correct.

You can close this consumer for now with Ctrl-C.

Update the named topology

15

Now let’s add a new operator (filter()) to the named topology:


  KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)))
                                                  .filter((k, v) -> k != 1L);

But this time, adding a new processor won’t change the name of the stateful parts of your application, as you’ve explicitly named them in the previous step.

Here’s an image of the relevant part of the updated topology with stateful operators are named (view a full image of the topology):


Kafka Streams Sub-topology

(Note that changelog topics don’t show up in Topology.describe())

You’ll notice the other processor names have shifted, but since these are stateless, that’s ok and it won’t break topology compatibility.

In the terminal running the streams application, use Ctrl-C, then restart the streams application with this command:


java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties filter-with-names

Produce records to the updated, named topology

16

One last time, from your producer/consumer terminal, start a console-producer

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --bootstrap-server broker:9092

Then copy-paste the following records to send. Again you’re sending updates with the same keys.

2foo
2bar
2baz

After you’ve sent the records, you can shut down the console-producer with Ctrl-C.

Consume latest updates

17

Finally, start a console-consumer.

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-" \
 --max-messages 12

You should see the following output:

1-1
1-2
1-3
1-1
1-2
1-3
2-1
2-2
2-3
2-4
2-5
2-6

The last three records, 2-4, 2-5, 2-6, show the correct output, as you produced six records with the same key.

You have updated your topology and reused the existing state stores and internal topics!



Some points to remember

  1. Always name stateful operators

  2. If you haven’t named your stateful operators and you need to update your topology, use the Application Reset Tool to reprocess records.

    • Aggregation repartition topics (if needed)

  3. Use the Grouped.as() method

  4. Use Grouped to provide repartition Serdes as well if required

  5. Kafka Streams appends the text -repartition to the provided name

  6. If no name is provided, the state store name is used with -repartition appended

    • Joins

  7. Use the StreamJoined configuration object

  8. StreamJoined.name() names the join processors and provides the base-name of the repartition topic (if needed)

  9. StreamJoined.withStoreName() is used to name the state stores associated with the join.

  10. If you need to provide Serdes you’ll use StreamJoined as well

    • State Stores

  11. Use Materialized.as() method

  12. Use Materialized for state store Serdes if needed

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.