How to build your first Apache Kafka Streams application

Question:

How do you get started building your first Kafka Streams application?

Edit this page

Example use case:

You'd like to get started with Kafka Streams, but you're not sure where to start. In this tutorial, you'll build a small stream processing application and produce some sample data to test it. After you complete this tutorial, you can go more in depth in the Kafka Streams 101 course.

Hands-on code example:

New to Confluent Cloud? Get started here.

Short Answer

Using the Apache Kafka Streams DSL, create a stream processing topology to define your business logic. The example below reads events from the input topic using the stream function, processes events using the mapValues transformation, allows for debugging with peek, and writes the transformed events to an output topic using to.

static Topology buildTopology(String inputTopic, String outputTopic) {
    Serde<String> stringSerde = Serdes.String();
    StreamsBuilder builder = new StreamsBuilder();
    builder
        .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
        .peek((k,v) -> logger.info("Observed event: {}", v))
        .mapValues(s -> s.toUpperCase())
        .peek((k,v) -> logger.info("Transformed event: {}", v))
        .to(outputTopic, Produced.with(stringSerde, stringSerde));
    return builder.build();
}

Run it

Provision your Kafka cluster

1

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.

Take me to Confluent Cloud
  1. After you log in to Confluent Cloud, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir creating-first-apache-kafka-streams-application && cd creating-first-apache-kafka-streams-application

Next, create a directory for configuration data:

mkdir configuration

Save cloud configuration values to a local file

3

From the Confluent Cloud Console, navigate to your Kafka cluster and then select Clients in the lefthand navigation. From the Clients view, create a new client and click Java to get the connection information customized to your cluster.

Create new credentials for your Kafka cluster and Schema Registry, writing in appropriate descriptions so that the keys are easy to find and delete later. The Confluent Cloud Console will show a configuration similar to below with your new credentials automatically populated (make sure Show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials.

Download and set up the Confluent CLI

4

This tutorial has some steps for Kafka topic management and producing and consuming events, for which you can use the Confluent Cloud Console or the Confluent CLI. Follow the instructions here to install the Confluent CLI, and then follow these steps connect the CLI to your Confluent Cloud cluster.

Configure the project

5

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
    id "application"}

version = "0.0.1"
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
application {
  mainClass.set("io.confluent.developer.KafkaStreamsApplication")
}

repositories {
    mavenCentral()


    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation 'org.apache.kafka:kafka-streams:3.4.0'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.4.0'
        }
      }
    implementation 'com.github.javafaker:javafaker:1.0.2'
    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaStreamsApplication"
    )
  }
}

shadowJar {
    archivesBaseName = "creating-first-apache-kafka-streams-application-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Then create a configuration file for development at configuration/dev.properties:

application.id=kafka-streams-101

input.topic.name=random-strings
output.topic.name=tall-random-strings

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

Update the properties file with Confluent Cloud information

6

Using the command below, append the contents of configuration/ccloud.properties (with your Confluent Cloud configuration) to configuration/dev.properties (with the application properties).

cat configuration/ccloud.properties >> configuration/dev.properties

Create a Utility class

7

First, create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Now create a utility class that provides functions to support our tutorial. You may decide not to include these types of functions in the production version of your application, however, they are useful for getting started quickly. This utility class includes functions to create our Kafka topics and generate sample event data we can use to exercise our Kafka Streams topology.

Create the following file at src/main/java/io/confluent/developer/Util.java.

package io.confluent.developer;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class Util implements AutoCloseable {

    private final Logger logger = LoggerFactory.getLogger(Util.class);
    private ExecutorService executorService = Executors.newFixedThreadPool(1);

    public class Randomizer implements AutoCloseable, Runnable {
        private Properties props;
        private String topic;
        private Producer<String, String> producer;
        private boolean closed;

        public Randomizer(Properties producerProps, String topic) {
            this.closed = false;
            this.topic = topic;
            this.props = producerProps;
            this.props.setProperty("client.id", "faker");
        }

        public void run() {
            try (KafkaProducer producer = new KafkaProducer<String, String>(props)) {
                Faker faker = new Faker();
                while (!closed) {
                    try {
                        Object result = producer.send(new ProducerRecord<>(
                                this.topic,
                                faker.chuckNorris().fact())).get();
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Exception ex) {
                logger.error(ex.toString());
            }
        }
        public void close()  {
            closed = true;
        }
    }

    public Randomizer startNewRandomizer(Properties producerProps, String topic) {
        Randomizer rv = new Randomizer(producerProps, topic);
        executorService.submit(rv);
        return rv;
    }

    public void createTopics(final Properties allProps, List<NewTopic> topics)
            throws InterruptedException, ExecutionException, TimeoutException {
        try (final AdminClient client = AdminClient.create(allProps)) {
            logger.info("Creating topics");

            client.createTopics(topics).values().forEach( (topic, future) -> {
                try {
                    future.get();
                } catch (Exception ex) {
                    logger.info(ex.toString());
                }
            });

            Collection<String> topicNames = topics
                .stream()
                .map(t -> t.name())
                .collect(Collectors.toCollection(LinkedList::new));

            logger.info("Asking cluster for topic descriptions");
            client
                .describeTopics(topicNames)
                .allTopicNames()
                .get(10, TimeUnit.SECONDS)
                .forEach((name, description) -> logger.info("Topic Description: {}", description.toString()));
        }
    }

    public void close() {
        if (executorService != null) {
            executorService.shutdownNow();
            executorService = null;
        }
    }
}

Create the Kafka Streams topology

8

Kafka Streams applications define their logic in a processor topology, which is a graph of stream processors (nodes) and streams (edges). There are two methods for defining these components in your Kafka Streams application, the Streams DSL and the Processor API. The Streams DSL provides built-in abstractions for common event stream processing concepts like streams, tables, and transformations, while the Processor API can be used for advanced cases not supported by the DSL.

The Streams DSL is recommended for most use cases and this tutorial will use it to define a basic text processing application. To get started, let’s focus on the important bits of Kafka Streams application code, highlighting the DSL usage.

static Topology buildTopology(String inputTopic, String outputTopic) {
    Serde<String> stringSerde = Serdes.String();
    StreamsBuilder builder = new StreamsBuilder();
    builder
        .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
        .peek((k,v) -> logger.info("Observed event: {}", v))
        .mapValues(s -> s.toUpperCase())
        .peek((k,v) -> logger.info("Transformed event: {}", v))
        .to(outputTopic, Produced.with(stringSerde, stringSerde));
    return builder.build();
}

In the code above, the StreamsBuilder class is used to construct the design of the topology. The DSL API allows you to construct your application by chaining together the desired behaviors using a fluent API.

A typical topology follows a common pattern:

  • Consume one or more input streams using the stream function which accepts the names of the Kafka topics to consume from along with the deserializers required to decode the data.

  • Transform events by chaining together one or more transformations. In our example, we use mapValues to convert incoming String events to their upper case value.

  • Transformed events are streamed as the output of the topology using the to function specifying a destination topic as well as the serializers required to encode the data.

The peek function allows you to observe and act on events and they flow through the topology stages. In our example it is used to debug the topology by printing events as they flow through the topology.

Once the topology is defined within the builder, the buildTopology function returns an instance of the Topology created from builder.build. Separating the building of the Topology in a function is useful for testing purposes, which we will see in the Test It section of the tutorial.

Now go ahead and create the full Java source file at src/main/java/io/confluent/developer/KafkaStreamsApplication.java.

package io.confluent.developer;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class KafkaStreamsApplication {

    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class);

    static void runKafkaStreams(final KafkaStreams streams) {
        final CountDownLatch latch = new CountDownLatch(1);
        streams.setStateListener((newState, oldState) -> {
            if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) {
                latch.countDown();
            }
        });

        streams.start();

        try {
            latch.await();
        } catch (final InterruptedException e) {
            throw new RuntimeException(e);
        }

        logger.info("Streams Closed");
    }
    static Topology buildTopology(String inputTopic, String outputTopic) {
        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder builder = new StreamsBuilder();

        builder
            .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
            .peek((k,v) -> logger.info("Observed event: {}", v))
            .mapValues(s -> s.toUpperCase())
            .peek((k,v) -> logger.info("Transformed event: {}", v))
            .to(outputTopic, Produced.with(stringSerde, stringSerde));

        return builder.build();
    }
    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to a configuration file.");
        }

        Properties props = new Properties();
        try (InputStream inputStream = new FileInputStream(args[0])) {
            props.load(inputStream);
        }

        final String inputTopic = props.getProperty("input.topic.name");
        final String outputTopic = props.getProperty("output.topic.name");

        try (Util utility = new Util()) {

            utility.createTopics(
                    props,
                    Arrays.asList(
                            new NewTopic(inputTopic, Optional.empty(), Optional.empty()),
                            new NewTopic(outputTopic, Optional.empty(), Optional.empty())));

            // Ramdomizer only used to produce sample data for this application, not typical usage
            try (Util.Randomizer rando = utility.startNewRandomizer(props, inputTopic)) {

                KafkaStreams kafkaStreams = new KafkaStreams(
                        buildTopology(inputTopic, outputTopic),
                        props);

                Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

                logger.info("Kafka Streams 101 App Started");
                runKafkaStreams(kafkaStreams);

            }
        }
    }
}

Compile and run the Kafka Streams program

9

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar configuration/dev.properties

If the Kafka Streams application has started properly, you should see the debugging log output from the peek functions.

Something similar to:

[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris does not get compiler errors, the language changes itself to accommodate Chuck Norris.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS DOES NOT GET COMPILER ERRORS, THE LANGUAGE CHANGES ITSELF TO ACCOMMODATE CHUCK NORRIS.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris can write infinite recursion functions... and have them return.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS CAN WRITE INFINITE RECURSION FUNCTIONS... AND HAVE THEM RETURN.

Stream events using a console consumer

10

Now that the Kafka Streams application is running, run a command line consumer using the ccloud CLI to view the events (your ccloud context should be set to the proper environment, cluster, and API Key (see Step 4 above and Confluent CLI Reference for additional details).

Then, in a new terminal window, run the following console consumer to view the events being generated by the data generator and produced to the random-strings topic from the Randomizer class in your Kafka Streams application. These are the events that have been streamed into the topology (.stream(inputTopic, Consumed.with(stringSerde, stringSerde)).

confluent kafka topic consume random-strings

You should see output that looks like this (notice the mixed case of the string):

Starting Kafka Consumer. Use Ctrl-C to exit.
Chuck Norris can write multi-threaded applications with a single thread.
No statement can catch the ChuckNorrisException.
Chuck Norris can divide by zero.
Chuck Norris can binary search unsorted data.

Next, look at the transformed events in the tall-random-strings topic. These are the events that have been transformed (.mapValues) and written to the output topic .to(outputTopic, Produced.with(stringSerde, stringSerde)).

confluent kafka topic consume tall-random-strings

You should see output events that are entirely upper case:

Starting Kafka Consumer. Use Ctrl-C to exit.
CHUCK NORRIS CAN WRITE MULTI-THREADED APPLICATIONS WITH A SINGLE THREAD.
NO STATEMENT CAN CATCH THE CHUCKNORRISEXCEPTION.
CHUCK NORRIS CAN DIVIDE BY ZERO.
CHUCK NORRIS CAN BINARY SEARCH UNSORTED DATA

Once you are done with observing the behavior of the application, stop the consumers and the Kafka Streams application with Ctrl-C in the appropriate terminal windows.

Teardown Confluent Cloud resources

11

You may try another tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all of the resources you created. Verify they are destroyed to avoid unexpected charges.

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

application.id=kafka-streams-101
bootstrap.servers=localhost:29092

input.topic.name=random-strings
output.topic.name=tall-random-strings

Write a test

2

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Now create the following file at src/test/java/io/confluent/developer/KafkaStreamsApplicationTest.java. Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class KafkaStreamsApplicationTest {

  private final static String TEST_CONFIG_FILE = "configuration/test.properties";

  @Test
  public void topologyShouldUpperCaseInputs() throws IOException {

    final Properties props = new Properties();
    try (InputStream inputStream = new FileInputStream(TEST_CONFIG_FILE)) {
        props.load(inputStream);
    }

    final String inputTopicName = props.getProperty("input.topic.name");
    final String outputTopicName = props.getProperty("output.topic.name");

    final Topology topology = KafkaStreamsApplication.buildTopology(inputTopicName, outputTopicName);

    try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, props)) {
      Serde<String> stringSerde = Serdes.String();

      final TestInputTopic<String, String> inputTopic = testDriver
              .createInputTopic(inputTopicName, stringSerde.serializer(), stringSerde.serializer());
      final TestOutputTopic<String, String> outputTopic = testDriver
              .createOutputTopic(outputTopicName, stringSerde.deserializer(), stringSerde.deserializer());

      List<String> inputs = Arrays.asList(
        "Chuck Norris can write multi-threaded applications with a single thread.",
        "No statement can catch the ChuckNorrisException.",
        "Chuck Norris can divide by zero.",
        "Chuck Norris can binary search unsorted data."
      );
      List<String> expectedOutputs = inputs.stream()
        .map(String::toUpperCase).collect(Collectors.toList());

      inputs.forEach(inputTopic::pipeInput);
      final List<String> actualOutputs = outputTopic.readValuesToList();

      assertThat(expectedOutputs, equalTo(actualOutputs));

    }

  }
}

Invoke the tests

3

Now run the test, which is as simple as:

./gradlew test