Handling uncaught exceptions

Question:

How can you handle uncaught exceptions?

Edit this page

Example use case:

You have an event streaming application, and you want to make sure that it's robust in the face of unexpected errors. Depending on the situation, you'll want the application to either continue running or shut down. In this tutorial, you'll learn how to use the `StreamsUncaughtExceptionHandler` to provide this functionality.

Hands-on code example:

Short Answer

To handle uncaught exceptions, use the KafkaStreams.setUncaughtExceptionHandler method.

KafkaStreams kafkaStreams = new KafkaStreams(topologyBuilder.build(), properties);

// Using a lambda, take a static approach to errors regardless of the exception
kafkaStreams.setUncaughtExceptionHander((exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);

// Using a concrete implementation
kafkaStreams.setUncaughtExceptionHander(new MyExceptionHandler());

The StreamsUncaughtExceptionHandler interface gives you an opportunity to respond to exceptions not handled by Kafka Streams. It has one method, handle, and it returns an enum of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. There are three possible values: REPLACE_THREAD, SHUTDOWN_CLIENT, or SHUTDOWN_APPLICATION.

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir error-handling && cd error-handling

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

And launch it by running:

docker compose up -d

Configure the project

4

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
    id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.apache.avro:avro:1.11.1"
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation 'org.apache.kafka:kafka-streams:3.4.0'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.4.0'
        }
      }
    implementation "org.apache.kafka:kafka-clients:3.1.0"

    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.StreamsUncaughtExceptionHandling"
    )
  }
}

shadowJar {
    archiveBaseName = "error-handling-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=error-handling
bootstrap.servers=localhost:29092

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

max.failures=3
max.time.millis=3600000

Create an exception handler implementation

5

First, create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create the Kafka Streams application you’ll need to create an instance of a StreamsUncaughtExceptionHandler. For more information you can read KIP-671 which introduced the new functionality.

Before we dive into the code let’s briefly cover a few points about the StreamsUncaughtExceptionHander.

It’s an important point to keep in mind that the exception handler will not work for all exceptions, just those not directly handled by Kafka Streams. An example of an exception that Kafka Streams handles is the ProducerFencedException But any exceptions related to your business logic are not dealt with and bubble all the way up to the StreamThread, leaving the application no choice but to shut down. So the StreamsUncaughtExceptionHandler gives you a mechanism to take different actions in the case of a thrown exception.

The StreamsUncaughtExceptionHandler has one method handle, and it returns an enum of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. The possible return values are:

  • REPLACE_THREAD - Replaces the thread receiving the exception and processing continues with the same number of configured threads. (Note: this can result in duplicate records depending on the application’s processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)

  • SHUTDOWN_CLIENT - Shut down the individual instance of the Kafka Streams application experiencing the exception. (This is the previous behavior and the current default behavior if you don’t provide a StreamsUncaughtExceptionHandler)

  • SHUTDOWN_APPLICATION - Shut down all instances of a Kafka Streams application with the same application-id. Kafka Streams uses a rebalance to instruct all application instances to shutdown, so even those running on another machine will receive the signal and exit.

For your implementation of the StreamsUncaughtExceptionHandler, it will keep track of the number of errors that occur within a given time frame. If the number of errors exceed the threshold within the provided timeframe, then the entire application shuts down. While you could put the exception handling code in a lambda statement, having a separate concrete implementation is better for testing.

Here’s the constructor where you provide the max number of failures and the timeframe:

public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
    this.maxFailures = maxFailures;   (1)
    this.maxTimeIntervalMillis = maxTimeIntervalMillis;  (2)
}
1 The max number of failures your application will tolerate within a given timeframe
2 The max total time allowed for observing the failures

This is probably best understood by taking a look at the core logic:

 if (currentFailureCount >= maxFailures) {  (1)
    if (millisBetweenFailure <= maxTimeIntervalMillis) { (2)
        return SHUTDOWN_APPLICATION;
    } else {
        currentFailureCount = 0;  (3)
        previousErrorTime = null;
    }
}
return REPLACE_THREAD;  (4)
1 Checking if the current number of failures equals or exceeds the maximum
2 Checking if the threshold of max failures occurs within given time window, if yes then shut down.
3 If you’ve reached the max number, but the are spread out, reset
4 The default behavior here is to replace the thread

The idea here is that a couple of errors spread out are ok so processing continues. But a bunch of errors withing a small window of time could indicate a bigger issue, so it’s better to shutdown. While the code doesn’t inspect the type of the exception, that’s another valid approach as well.

The above code is just an example of what you could do and definitely not tested in a production setting. The main point here is while it’s a good idea to keep processing with a small number of errors, it’s not a good idea to continually replace the thread with sustained errors. It’s better to have some "guard rails" in place to make sure your application is robust, but won’t continue on when it shouldn’t.

Now create the following file at src/main/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandler.java

package io.confluent.developer;

import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;


public class MaxFailuresUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

    final int maxFailures;
    final long maxTimeIntervalMillis;
    private Instant previousErrorTime;
    private int currentFailureCount;


    public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
        this.maxFailures = maxFailures;
        this.maxTimeIntervalMillis = maxTimeIntervalMillis;
    }

    @Override
    public StreamThreadExceptionResponse handle(final Throwable throwable) {
        currentFailureCount++;
        Instant currentErrorTime = Instant.now();

        if (previousErrorTime == null) {
            previousErrorTime = currentErrorTime;
        }

        long millisBetweenFailure = ChronoUnit.MILLIS.between(previousErrorTime, currentErrorTime);

        if (currentFailureCount >= maxFailures) {
            if (millisBetweenFailure <= maxTimeIntervalMillis) {
                return SHUTDOWN_APPLICATION;
            } else {
                currentFailureCount = 0;
                previousErrorTime = null;
            }
        }
        return REPLACE_THREAD;
    }
}

You’ll add the StreamsUncaughtExceptionHandler to your Kafka Streams application in the next step.

There’s one more step you’ll need to take and that is creating a custom exception class ProcessorException that your exception handler will process. Create this simple Java class at src/main/java/io/confluent/developer/ProcessorException.java

package io.confluent.developer;

public class ProcessorException extends RuntimeException {

    public ProcessorException(String message) {
        super(message);
    }
}
There is an older, deprecated version of KafkaStreams.setUncaughtExceptionHandler that takes an instance of a java.lang.Thread.UncaughtExceptionHandler. It is advised for users to migrate to use the newer method.

Create the Kafka Streams topology

6

Here is the code we’ll use to drive our tutorial

   builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    counter++;
                    if (counter == 2 || counter == 8 || counter == 15) { (1)
                        throw new IllegalStateException("It works on my box!!!");
                    }
                    return value.toUpperCase();
                })
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

    // Details left out for clarity
    .......

    // In the main method

    final KafkaStreams streams = new KafkaStreams(topology, streamProps);
    final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval); (2)
    streams.setUncaughtExceptionHandler(exceptionHandler); (3)
1 Simulating an error depending on the value of a counter (which gets incremented with every record)
2 Instantiating the exception handler, the maxFailures (3) and maxTimeInterval (3600000 ms == 1 hour) variables get their values from the configuration files
3 Adding the handler to Kafka Streams

This code ensures that the rate of errors (3 within a 1 hour window) meets the criteria for shutting down the application.

Now create the following file at src/main/java/io/confluent/developer/StreamsUncaughtExceptionHandling.java

package io.confluent.developer;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class StreamsUncaughtExceptionHandling {

    int counter = 0;

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = allProps.getProperty("input.topic.name");
        final String outputTopic = allProps.getProperty("output.topic.name");

        builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    counter++;
                    if (counter == 2 || counter == 8 || counter == 15) {
                        throw new ProcessorException("It works on my box!!!");
                    }
                    return value.toUpperCase();
                })
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

        return builder.build();
    }

    public void createTopics(Properties allProps) {
        try (AdminClient client = AdminClient.create(allProps)) {
            List<NewTopic> topicList = new ArrayList<>();

            NewTopic sessionInput = new NewTopic(allProps.getProperty("input.topic.name"),
                    Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                    Short.parseShort(allProps.getProperty("input.topic.replication.factor")));
            topicList.add(sessionInput);

            NewTopic counts = new NewTopic(allProps.getProperty("output.topic.name"),
                    Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                    Short.parseShort(allProps.getProperty("output.topic.replication.factor")));

            topicList.add(counts);
            client.createTopics(topicList);
        }
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        StreamsUncaughtExceptionHandling tw = new StreamsUncaughtExceptionHandling();
        Properties allProps = tw.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // Change this to StreamsConfig.EXACTLY_ONCE to eliminate duplicates
        allProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
        Topology topology = tw.buildTopology(allProps);

        tw.createTopics(allProps);
        TutorialDataGenerator dataGenerator = new TutorialDataGenerator(allProps);
        dataGenerator.generate();

        final int maxFailures = Integer.parseInt(allProps.getProperty("max.failures"));
        final long maxTimeInterval = Long.parseLong(allProps.getProperty("max.time.millis"));
        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval);
        streams.setUncaughtExceptionHandler(exceptionHandler);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
            }
        });

        try {
            streams.cleanUp();
            streams.start();
        } catch (Throwable e) {
            System.exit(1);
        }
    }

    static class TutorialDataGenerator {
        final Properties properties;


        public TutorialDataGenerator(final Properties properties) {
            this.properties = properties;
        }

        public void generate() {
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

            try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
                String topic = properties.getProperty("input.topic.name");
                List<String> messages = Arrays.asList("All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit");


                messages.forEach(message -> producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
                        if (exception != null) {
                            exception.printStackTrace(System.out);
                        } else {
                            System.out.printf("Produced record (%s) at offset %d to topic %s %n", message, metadata.offset(), metadata.topic());
                        }
                }));
            }
        }
    }
}

Compile and run the Kafka Streams program

7

Now that we have data generation working, let’s build your application by running:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally.

The application for this tutorial includes a record generator to populate the topic data. Here is the list of records produced:

"All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit"

Since we force some exceptions at different intervals while the streams application runs, you should see some stack traces in the console indicating an error, but the application will continue running. However, when the application encounters an error that meets the threshold for max errors, it will shut down.

Now run the following program, but watch the logs in the console and let the application run for a few seconds.

java -jar build/libs/error-handling-standalone-0.0.1.jar configuration/dev.properties

You should observe it shutting down and see something similar to this in the console

INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] State transition from PENDING_ERROR to ERROR
INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] Streams client stopped to ERROR completely

Consume data from the output topic

8

Now that you’ve run the Kafka Streams application, it should have shut it self down due to reaching the max-error threshold.

Let’s now run the kafka-console-consumer to confirm the output:

docker exec -t broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --from-beginning \
 --max-messages 6

Your results should look something like this:


ALL
ALL
STREAMS
LEAD
TO
CONFLUENT
ALL
STREAMS
LEAD
TO
CONFLUENT
GO
Processed a total of 12 messages

You’ll notice there are some duplicated values in the output. This duplication is to be expected, as the streams application is running with the default processing mode of AT_LEAST_ONCE. Duplicate values is one thing to consider when using REPLACE_THREAD with the StreamsUncaughtExceptionHander, since this is analogous to using retries with the KafkaProducer. If you don’t want duplicate values, you should consider running with the processing mode of EXACTLY_ONCE

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

application.id=error-handling-test
bootstrap.servers=localhost:29092

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

max.failures=3
max.time.millis=120000

Write a test

2

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

The test for our streams application is simple, but we have two scenarios to cover. The first is when the data is not in the expected format, so we expect that the topology will throw an exception. The second case is the happy path where the data is as we expect:

@Test
public void shouldThrowException() {
    assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(List.of("foo", "bar"))); (1)
}

@Test
public void shouldProcessValues() {  (2)
    var validMessages =  Collections.singletonList("foo");
    var expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
    inputTopic.pipeValueList(validMessages);
    var actualResults = outputTopic.readValuesToList();
    assertEquals(expectedMessages, actualResults);
}
1 Test verifying unexpected format throws exception
2 Test validating the expected processing

We also have logic in the MaxFailuresUncaughtExceptionHandler that needs testing as well. Just like the streams application test, we have two scenarios to verify.

  • The case when errors are spread out so the exception handler should return REPLACE_THREAD

  • The case when the errors occur within our window and we expect the handler to return a SHUTDOWN_APPLICATION

@Test
public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception {  (1)
    for (int i = 0; i < 10; i++) {
        assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
        Thread.sleep(200);
    }
}
@Test
public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception { (2)
    assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
    Thread.sleep(50);
    assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
}
1 Test validating errors spread out result in replacing the thread
2 This test validates that a bunch of errors in a small timeframe result in a shutdown

With the brief testing discussion done, let’s create our two test files.

First create the topology test file at src/test/java/io/confluent/developer/StreamsUncaughtExceptionHandlingTest.java.

package io.confluent.developer;


import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertThrows;


public class StreamsUncaughtExceptionHandlingTest {

    private final static String TEST_CONFIG_FILE = "configuration/test.properties";

    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    private TopologyTestDriver testDriver;


    @Before
    public void setUp() throws IOException {
        final StreamsUncaughtExceptionHandling instance = new StreamsUncaughtExceptionHandling();
        final Properties allProps = instance.loadEnvProperties(TEST_CONFIG_FILE);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final String sessionDataInputTopic = allProps.getProperty("input.topic.name");
        final String outputTopicName = allProps.getProperty("output.topic.name");

        final Topology topology = instance.buildTopology(allProps);
        testDriver = new TopologyTestDriver(topology, allProps);
        final Serializer<String> keySerializer = Serdes.String().serializer();
        final Serializer<String> exampleSerializer = Serdes.String().serializer();
        final Deserializer<String> valueDeserializer = Serdes.String().deserializer();
        final Deserializer<String> keyDeserializer = Serdes.String().deserializer();

        inputTopic = testDriver.createInputTopic(sessionDataInputTopic, keySerializer, exampleSerializer);
        outputTopic = testDriver.createOutputTopic(outputTopicName, keyDeserializer, valueDeserializer);
    }

    @After
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void shouldThrowException() {
        assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(Arrays.asList("foo", "bar")));
    }

    @Test
    public void shouldProcessValues() {
        List<String> validMessages =  Collections.singletonList("foo");
        List<String> expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
        inputTopic.pipeValueList(validMessages);
        List<String> actualResults = outputTopic.readValuesToList();
        assertEquals(expectedMessages, actualResults);
    }

}

Then create the handler test file at src/test/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandlerTest.java.

package io.confluent.developer;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;

public class MaxFailuresUncaughtExceptionHandlerTest {

    private MaxFailuresUncaughtExceptionHandler exceptionHandler;
    private final IllegalStateException worksOnMyBoxException = new IllegalStateException("Strange, It worked on my box");

    @Before
    public void setUp() {
        long maxTimeMillis = 100;
        int maxFailures = 2;
        exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeMillis);
    }

    @Test
    public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception {
        for (int i = 0; i < 10; i++) {
            assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
            Thread.sleep(200);
        }
    }

    @Test
    public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception {
        assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
        Thread.sleep(50);
        assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
    }
}

Invoke the tests

3

Now run the test, which is as simple as:

./gradlew test

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.