Handling uncaught exceptions

Question:

How can you handle uncaught exceptions?

Edit this page

Example use case:

You have an event streaming application, and you want to make sure that it's robust in the face of unexpected errors. Depending on the situation, you'll want the application to either continue running or shut down. In this tutorial, you'll learn how to use the `StreamsUncaughtExceptionHandler` to provide this functionality.

Hands-on code example:

New to Confluent Cloud? Get started here.

Short Answer

To handle uncaught exceptions, use the KafkaStreams.setUncaughtExceptionHandler method.

KafkaStreams kafkaStreams = new KafkaStreams(topologyBuilder.build(), properties);

// Using a lambda, take a static approach to errors regardless of the exception
kafkaStreams.setUncaughtExceptionHander((exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);

// Using a concrete implementation
kafkaStreams.setUncaughtExceptionHander(new MyExceptionHandler());

The StreamsUncaughtExceptionHandler interface gives you an opportunity to respond to exceptions not handled by Kafka Streams. It has one method, handle, and it returns an enum of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. There are three possible values: REPLACE_THREAD, SHUTDOWN_CLIENT, or SHUTDOWN_APPLICATION.

Run it

Initialize the project

1

To get started, make a new directory anywhere you’d like for this project:

mkdir error-handling && cd error-handling

Next, create a directory for configuration data:

mkdir configuration

Provision your Kafka cluster

2

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.

Take me to Confluent Cloud
  1. After you log in to Confluent Cloud, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Write the cluster information into a local file

3

From the Confluent Cloud Console, navigate to your Kafka cluster and then select Clients in the lefthand navigation. From the Clients view, create a new client and click Java to get the connection information customized to your cluster.

Create new credentials for your Kafka cluster and Schema Registry, writing in appropriate descriptions so that the keys are easy to find and delete later. The Confluent Cloud Console will show a configuration similar to below with your new credentials automatically populated (make sure Show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials.

Download and set up the Confluent CLI

4

This tutorial has some steps for Kafka topic management and producing and consuming events, for which you can use the Confluent Cloud Console or the Confluent CLI. Follow the instructions here to install the Confluent CLI, and then follow these steps connect the CLI to your Confluent Cloud cluster.

Configure the project

5

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
    id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.apache.avro:avro:1.11.1"
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation 'org.apache.kafka:kafka-streams:3.4.0'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.4.0'
        }
      }
    implementation "org.apache.kafka:kafka-clients:3.1.0"

    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.StreamsUncaughtExceptionHandling"
    )
  }
}

shadowJar {
    archiveBaseName = "error-handling-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Then create a development file at configuration/dev.properties:

application.id=error-handling
replication.factor=3

input.topic.name=input-topic
input.topic.partitions=6
input.topic.replication.factor=3

output.topic.name=output-topic
output.topic.partitions=6
output.topic.replication.factor=3

max.failures=3
max.time.millis=3600000

Update the properties file with Confluent Cloud information

6

Using the command below, append the contents of configuration/ccloud.properties (with your Confluent Cloud configuration) to configuration/dev.properties (with the application properties).

cat configuration/ccloud.properties >> configuration/dev.properties

Create an exception handler implementation

7

First, create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create the Kafka Streams application you’ll need to create an instance of a StreamsUncaughtExceptionHandler. For more information you can read KIP-671 which introduced the new functionality.

Before we dive into the code let’s briefly cover a few points about the StreamsUncaughtExceptionHander.

It’s an important point to keep in mind that the exception handler will not work for all exceptions, just those not directly handled by Kafka Streams. An example of an exception that Kafka Streams handles is the ProducerFencedException But any exceptions related to your business logic are not dealt with and bubble all the way up to the StreamThread, leaving the application no choice but to shut down. So the StreamsUncaughtExceptionHandler gives you a mechanism to take different actions in the case of a thrown exception.

The StreamsUncaughtExceptionHandler has one method handle, and it returns an enum of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. The possible return values are:

  • REPLACE_THREAD - Replaces the thread receiving the exception and processing continues with the same number of configured threads. (Note: this can result in duplicate records depending on the application’s processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)

  • SHUTDOWN_CLIENT - Shut down the individual instance of the Kafka Streams application experiencing the exception. (This is the previous behavior and the current default behavior if you don’t provide a StreamsUncaughtExceptionHandler)

  • SHUTDOWN_APPLICATION - Shut down all instances of a Kafka Streams application with the same application-id. Kafka Streams uses a rebalance to instruct all application instances to shutdown, so even those running on another machine will receive the signal and exit.

For your implementation of the StreamsUncaughtExceptionHandler, it will keep track of the number of errors that occur within a given time frame. If the number of errors exceed the threshold within the provided timeframe, then the entire application shuts down. While you could put the exception handling code in a lambda statement, having a separate concrete implementation is better for testing.

Here’s the constructor where you provide the max number of failures and the timeframe:

public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
    this.maxFailures = maxFailures;   (1)
    this.maxTimeIntervalMillis = maxTimeIntervalMillis;  (2)
}
1 The max number of failures your application will tolerate within a given timeframe
2 The max total time allowed for observing the failures

This is probably best understood by taking a look at the core logic:

 if (currentFailureCount >= maxFailures) {  (1)
    if (millisBetweenFailure <= maxTimeIntervalMillis) { (2)
        return SHUTDOWN_APPLICATION;
    } else {
        currentFailureCount = 0;  (3)
        previousErrorTime = null;
    }
}
return REPLACE_THREAD;  (4)
1 Checking if the current number of failures equals or exceeds the maximum
2 Checking if the threshold of max failures occurs within given time window, if yes then shut down.
3 If you’ve reached the max number, but the are spread out, reset
4 The default behavior here is to replace the thread

The idea here is that a couple of errors spread out are ok so processing continues. But a bunch of errors withing a small window of time could indicate a bigger issue, so it’s better to shutdown. While the code doesn’t inspect the type of the exception, that’s another valid approach as well.

The above code is just an example of what you could do and definitely not tested in a production setting. The main point here is while it’s a good idea to keep processing with a small number of errors, it’s not a good idea to continually replace the thread with sustained errors. It’s better to have some "guard rails" in place to make sure your application is robust, but won’t continue on when it shouldn’t.

Now create the following file at src/main/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandler.java

package io.confluent.developer;

import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;


public class MaxFailuresUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

    final int maxFailures;
    final long maxTimeIntervalMillis;
    private Instant previousErrorTime;
    private int currentFailureCount;


    public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
        this.maxFailures = maxFailures;
        this.maxTimeIntervalMillis = maxTimeIntervalMillis;
    }

    @Override
    public StreamThreadExceptionResponse handle(final Throwable throwable) {
        currentFailureCount++;
        Instant currentErrorTime = Instant.now();

        if (previousErrorTime == null) {
            previousErrorTime = currentErrorTime;
        }

        long millisBetweenFailure = ChronoUnit.MILLIS.between(previousErrorTime, currentErrorTime);

        if (currentFailureCount >= maxFailures) {
            if (millisBetweenFailure <= maxTimeIntervalMillis) {
                return SHUTDOWN_APPLICATION;
            } else {
                currentFailureCount = 0;
                previousErrorTime = null;
            }
        }
        return REPLACE_THREAD;
    }
}

You’ll add the StreamsUncaughtExceptionHandler to your Kafka Streams application in the next step.

There’s one more step you’ll need to take and that is creating a custom exception class ProcessorException that your exception handler will process. Create this simple Java class at src/main/java/io/confluent/developer/ProcessorException.java

package io.confluent.developer;

public class ProcessorException extends RuntimeException {

    public ProcessorException(String message) {
        super(message);
    }
}
There is an older, deprecated version of KafkaStreams.setUncaughtExceptionHandler that takes an instance of a java.lang.Thread.UncaughtExceptionHandler. It is advised for users to migrate to use the newer method.

Create the Kafka Streams topology

8

Here is the code we’ll use to drive our tutorial

   builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    counter++;
                    if (counter == 2 || counter == 8 || counter == 15) { (1)
                        throw new IllegalStateException("It works on my box!!!");
                    }
                    return value.toUpperCase();
                })
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

    // Details left out for clarity
    .......

    // In the main method

    final KafkaStreams streams = new KafkaStreams(topology, streamProps);
    final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval); (2)
    streams.setUncaughtExceptionHandler(exceptionHandler); (3)
1 Simulating an error depending on the value of a counter (which gets incremented with every record)
2 Instantiating the exception handler, the maxFailures (3) and maxTimeInterval (3600000 ms == 1 hour) variables get their values from the configuration files
3 Adding the handler to Kafka Streams

This code ensures that the rate of errors (3 within a 1 hour window) meets the criteria for shutting down the application.

Now create the following file at src/main/java/io/confluent/developer/StreamsUncaughtExceptionHandling.java

package io.confluent.developer;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class StreamsUncaughtExceptionHandling {

    int counter = 0;

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = allProps.getProperty("input.topic.name");
        final String outputTopic = allProps.getProperty("output.topic.name");

        builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    counter++;
                    if (counter == 2 || counter == 8 || counter == 15) {
                        throw new ProcessorException("It works on my box!!!");
                    }
                    return value.toUpperCase();
                })
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

        return builder.build();
    }

    public void createTopics(Properties allProps) {
        try (AdminClient client = AdminClient.create(allProps)) {
            List<NewTopic> topicList = new ArrayList<>();

            NewTopic sessionInput = new NewTopic(allProps.getProperty("input.topic.name"),
                    Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                    Short.parseShort(allProps.getProperty("input.topic.replication.factor")));
            topicList.add(sessionInput);

            NewTopic counts = new NewTopic(allProps.getProperty("output.topic.name"),
                    Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                    Short.parseShort(allProps.getProperty("output.topic.replication.factor")));

            topicList.add(counts);
            client.createTopics(topicList);
        }
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        StreamsUncaughtExceptionHandling tw = new StreamsUncaughtExceptionHandling();
        Properties allProps = tw.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // Change this to StreamsConfig.EXACTLY_ONCE to eliminate duplicates
        allProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
        Topology topology = tw.buildTopology(allProps);

        tw.createTopics(allProps);
        TutorialDataGenerator dataGenerator = new TutorialDataGenerator(allProps);
        dataGenerator.generate();

        final int maxFailures = Integer.parseInt(allProps.getProperty("max.failures"));
        final long maxTimeInterval = Long.parseLong(allProps.getProperty("max.time.millis"));
        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval);
        streams.setUncaughtExceptionHandler(exceptionHandler);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
            }
        });

        try {
            streams.cleanUp();
            streams.start();
        } catch (Throwable e) {
            System.exit(1);
        }
    }

    static class TutorialDataGenerator {
        final Properties properties;


        public TutorialDataGenerator(final Properties properties) {
            this.properties = properties;
        }

        public void generate() {
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

            try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
                String topic = properties.getProperty("input.topic.name");
                List<String> messages = Arrays.asList("All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit");


                messages.forEach(message -> producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
                        if (exception != null) {
                            exception.printStackTrace(System.out);
                        } else {
                            System.out.printf("Produced record (%s) at offset %d to topic %s %n", message, metadata.offset(), metadata.topic());
                        }
                }));
            }
        }
    }
}

Compile and run the Kafka Streams program

9

Now that we have data generation working, let’s build your application by running:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally.

The application for this tutorial includes a record generator to populate the topic data. Here is the list of records produced:

"All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit"

Since we force some exceptions at different intervals while the streams application runs, you should see some stack traces in the console indicating an error, but the application will continue running. However, when the application encounters an error that meets the threshold for max errors, it will shut down.

Now run the following program, but watch the logs in the console and let the application run for a few seconds.

java -jar build/libs/error-handling-standalone-0.0.1.jar configuration/dev.properties

You should observe it shutting down and see something similar to this in the console

INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] State transition from PENDING_ERROR to ERROR
INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] Streams client stopped to ERROR completely

Consume data from the output topic

10

Now that you’ve run the Kafka Streams application, it should have shut itself down due to reaching the max-error threshold.

Let’s now run the Confluent CLI to confirm the output:

confluent kafka topic consume output-topic --from-beginning

Your results should look something like this:


ALL
ALL
STREAMS
LEAD
TO
CONFLUENT
ALL
STREAMS
LEAD
TO
CONFLUENT
GO

You’ll notice there are some duplicated values in the output. This duplication is to be expected, as the streams application is running with the default processing mode of AT_LEAST_ONCE. Duplicate values is one thing to consider when using REPLACE_THREAD with the StreamsUncaughtExceptionHander, since this is analogous to using retries with the KafkaProducer. If you don’t want duplicate values, you should consider running with the processing mode of EXACTLY_ONCE

Enter Ctrl-C to exit.

Teardown Confluent Cloud resources

11

You may try another tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all of the resources you created. Verify they are destroyed to avoid unexpected charges.

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

application.id=error-handling-test
bootstrap.servers=localhost:29092

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

max.failures=3
max.time.millis=120000

Write a test

2

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

The test for our streams application is simple, but we have two scenarios to cover. The first is when the data is not in the expected format, so we expect that the topology will throw an exception. The second case is the happy path where the data is as we expect:

@Test
public void shouldThrowException() {
    assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(List.of("foo", "bar"))); (1)
}

@Test
public void shouldProcessValues() {  (2)
    var validMessages =  Collections.singletonList("foo");
    var expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
    inputTopic.pipeValueList(validMessages);
    var actualResults = outputTopic.readValuesToList();
    assertEquals(expectedMessages, actualResults);
}
1 Test verifying unexpected format throws exception
2 Test validating the expected processing

We also have logic in the MaxFailuresUncaughtExceptionHandler that needs testing as well. Just like the streams application test, we have two scenarios to verify.

  • The case when errors are spread out so the exception handler should return REPLACE_THREAD

  • The case when the errors occur within our window and we expect the handler to return a SHUTDOWN_APPLICATION

@Test
public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception {  (1)
    for (int i = 0; i < 10; i++) {
        assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
        Thread.sleep(200);
    }
}
@Test
public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception { (2)
    assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
    Thread.sleep(50);
    assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
}
1 Test validating errors spread out result in replacing the thread
2 This test validates that a bunch of errors in a small timeframe result in a shutdown

With the brief testing discussion done, let’s create our two test files.

First create the topology test file at src/test/java/io/confluent/developer/StreamsUncaughtExceptionHandlingTest.java.

package io.confluent.developer;


import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertThrows;


public class StreamsUncaughtExceptionHandlingTest {

    private final static String TEST_CONFIG_FILE = "configuration/test.properties";

    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    private TopologyTestDriver testDriver;


    @Before
    public void setUp() throws IOException {
        final StreamsUncaughtExceptionHandling instance = new StreamsUncaughtExceptionHandling();
        final Properties allProps = instance.loadEnvProperties(TEST_CONFIG_FILE);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final String sessionDataInputTopic = allProps.getProperty("input.topic.name");
        final String outputTopicName = allProps.getProperty("output.topic.name");

        final Topology topology = instance.buildTopology(allProps);
        testDriver = new TopologyTestDriver(topology, allProps);
        final Serializer<String> keySerializer = Serdes.String().serializer();
        final Serializer<String> exampleSerializer = Serdes.String().serializer();
        final Deserializer<String> valueDeserializer = Serdes.String().deserializer();
        final Deserializer<String> keyDeserializer = Serdes.String().deserializer();

        inputTopic = testDriver.createInputTopic(sessionDataInputTopic, keySerializer, exampleSerializer);
        outputTopic = testDriver.createOutputTopic(outputTopicName, keyDeserializer, valueDeserializer);
    }

    @After
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void shouldThrowException() {
        assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(Arrays.asList("foo", "bar")));
    }

    @Test
    public void shouldProcessValues() {
        List<String> validMessages =  Collections.singletonList("foo");
        List<String> expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
        inputTopic.pipeValueList(validMessages);
        List<String> actualResults = outputTopic.readValuesToList();
        assertEquals(expectedMessages, actualResults);
    }

}

Then create the handler test file at src/test/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandlerTest.java.

package io.confluent.developer;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;

public class MaxFailuresUncaughtExceptionHandlerTest {

    private MaxFailuresUncaughtExceptionHandler exceptionHandler;
    private final IllegalStateException worksOnMyBoxException = new IllegalStateException("Strange, It worked on my box");

    @Before
    public void setUp() {
        long maxTimeMillis = 100;
        int maxFailures = 2;
        exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeMillis);
    }

    @Test
    public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception {
        for (int i = 0; i < 10; i++) {
            assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
            Thread.sleep(200);
        }
    }

    @Test
    public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception {
        assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
        Thread.sleep(50);
        assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
    }
}

Invoke the tests

3

Now run the test, which is as simple as:

./gradlew test