public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
this.maxFailures = maxFailures; (1)
this.maxTimeIntervalMillis = maxTimeIntervalMillis; (2)
}
How can you handle uncaught exceptions?
To handle uncaught exceptions, use the KafkaStreams.setUncaughtExceptionHandler method.
KafkaStreams kafkaStreams = new KafkaStreams(topologyBuilder.build(), properties);
// Using a lambda, take a static approach to errors regardless of the exception
kafkaStreams.setUncaughtExceptionHander((exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
// Using a concrete implementation
kafkaStreams.setUncaughtExceptionHander(new MyExceptionHandler());
The StreamsUncaughtExceptionHandler
interface gives you an opportunity to respond to exceptions not handled by Kafka Streams. It has one method, handle
, and it returns an enum
of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. There are three possible values: REPLACE_THREAD
, SHUTDOWN_CLIENT
, or SHUTDOWN_APPLICATION
.
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir error-handling && cd error-handling
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
And launch it by running:
docker compose up -d
Create the following Gradle build file, named build.gradle
for the project:
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
}
}
plugins {
id "java"
id "idea"
id "eclipse"
id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"
repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}
apply plugin: "com.github.johnrengelman.shadow"
dependencies {
implementation "org.apache.avro:avro:1.11.1"
implementation "org.slf4j:slf4j-simple:2.0.7"
implementation 'org.apache.kafka:kafka-streams:3.4.0'
implementation ('org.apache.kafka:kafka-clients') {
version {
strictly '3.4.0'
}
}
implementation "org.apache.kafka:kafka-clients:3.1.0"
testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
testImplementation "junit:junit:4.13.2"
testImplementation 'org.hamcrest:hamcrest:2.2'
}
test {
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
exceptionFormat = "full"
}
}
jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
"Main-Class": "io.confluent.developer.StreamsUncaughtExceptionHandling"
)
}
}
shadowJar {
archiveBaseName = "error-handling-standalone"
archiveClassifier = ''
}
And be sure to run the following command to obtain the Gradle wrapper:
gradle wrapper
Next, create a directory for configuration data:
mkdir configuration
Then create a development file at configuration/dev.properties
:
application.id=error-handling
bootstrap.servers=localhost:29092
input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1
output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1
max.failures=3
max.time.millis=3600000
First, create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
Before you create the Kafka Streams application you’ll need to create an instance of a StreamsUncaughtExceptionHandler. For more information you can read KIP-671 which introduced the new functionality.
Before we dive into the code let’s briefly cover a few points about the StreamsUncaughtExceptionHander
.
It’s an important point to keep in mind that the exception handler will not work for all exceptions, just those not directly handled by Kafka Streams. An example of an exception that Kafka Streams handles is the ProducerFencedException But any exceptions related to your business logic are not dealt with and bubble all the way up to the StreamThread
, leaving the application no choice but to shut down. So the StreamsUncaughtExceptionHandler
gives you a mechanism to take different actions in the case of a thrown exception.
The StreamsUncaughtExceptionHandler
has one method handle
, and it returns an enum
of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. The possible return values are:
REPLACE_THREAD - Replaces the thread receiving the exception and processing continues with the same number of configured threads. (Note: this can result in duplicate records depending on the application’s processing mode determined by the PROCESSING_GUARANTEE_CONFIG
value)
SHUTDOWN_CLIENT - Shut down the individual instance of the Kafka Streams application experiencing the exception. (This is the previous behavior and the current default behavior if you don’t provide a StreamsUncaughtExceptionHandler
)
SHUTDOWN_APPLICATION - Shut down all instances of a Kafka Streams application with the same application-id. Kafka Streams uses a rebalance to instruct all application instances to shutdown, so even those running on another machine will receive the signal and exit.
For your implementation of the StreamsUncaughtExceptionHandler
, it will keep track of the number of errors that occur within a given time frame. If the number of errors exceed the threshold within the provided timeframe, then the entire application shuts down. While you could put the exception handling code in a lambda statement, having a separate concrete implementation is better for testing.
Here’s the constructor where you provide the max number of failures and the timeframe:
public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
this.maxFailures = maxFailures; (1)
this.maxTimeIntervalMillis = maxTimeIntervalMillis; (2)
}
1 | The max number of failures your application will tolerate within a given timeframe |
2 | The max total time allowed for observing the failures |
This is probably best understood by taking a look at the core logic:
if (currentFailureCount >= maxFailures) { (1)
if (millisBetweenFailure <= maxTimeIntervalMillis) { (2)
return SHUTDOWN_APPLICATION;
} else {
currentFailureCount = 0; (3)
previousErrorTime = null;
}
}
return REPLACE_THREAD; (4)
1 | Checking if the current number of failures equals or exceeds the maximum |
2 | Checking if the threshold of max failures occurs within given time window, if yes then shut down. |
3 | If you’ve reached the max number, but the are spread out, reset |
4 | The default behavior here is to replace the thread |
The idea here is that a couple of errors spread out are ok so processing continues. But a bunch of errors withing a small window of time could indicate a bigger issue, so it’s better to shutdown. While the code doesn’t inspect the type of the exception, that’s another valid approach as well.
The above code is just an example of what you could do and definitely not tested in a production setting. The main point here is while it’s a good idea to keep processing with a small number of errors, it’s not a good idea to continually replace the thread with sustained errors. It’s better to have some "guard rails" in place to make sure your application is robust, but won’t continue on when it shouldn’t.
Now create the following file at src/main/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandler.java
package io.confluent.developer;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;
public class MaxFailuresUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
final int maxFailures;
final long maxTimeIntervalMillis;
private Instant previousErrorTime;
private int currentFailureCount;
public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
this.maxFailures = maxFailures;
this.maxTimeIntervalMillis = maxTimeIntervalMillis;
}
@Override
public StreamThreadExceptionResponse handle(final Throwable throwable) {
currentFailureCount++;
Instant currentErrorTime = Instant.now();
if (previousErrorTime == null) {
previousErrorTime = currentErrorTime;
}
long millisBetweenFailure = ChronoUnit.MILLIS.between(previousErrorTime, currentErrorTime);
if (currentFailureCount >= maxFailures) {
if (millisBetweenFailure <= maxTimeIntervalMillis) {
return SHUTDOWN_APPLICATION;
} else {
currentFailureCount = 0;
previousErrorTime = null;
}
}
return REPLACE_THREAD;
}
}
You’ll add the StreamsUncaughtExceptionHandler
to your Kafka Streams application in the next step.
There’s one more step you’ll need to take and that is creating a custom exception class ProcessorException
that your exception handler will process. Create this simple Java class at src/main/java/io/confluent/developer/ProcessorException.java
package io.confluent.developer;
public class ProcessorException extends RuntimeException {
public ProcessorException(String message) {
super(message);
}
}
There is an older, deprecated version of KafkaStreams.setUncaughtExceptionHandler that takes an instance of a java.lang.Thread.UncaughtExceptionHandler . It is advised for users to migrate to use the newer method.
|
Here is the code we’ll use to drive our tutorial
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> {
counter++;
if (counter == 2 || counter == 8 || counter == 15) { (1)
throw new IllegalStateException("It works on my box!!!");
}
return value.toUpperCase();
})
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
// Details left out for clarity
.......
// In the main method
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval); (2)
streams.setUncaughtExceptionHandler(exceptionHandler); (3)
1 | Simulating an error depending on the value of a counter (which gets incremented with every record) |
2 | Instantiating the exception handler, the maxFailures (3) and maxTimeInterval (3600000 ms == 1 hour) variables get their values from the configuration files |
3 | Adding the handler to Kafka Streams |
This code ensures that the rate of errors (3 within a 1 hour window) meets the criteria for shutting down the application.
Now create the following file at src/main/java/io/confluent/developer/StreamsUncaughtExceptionHandling.java
package io.confluent.developer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class StreamsUncaughtExceptionHandling {
int counter = 0;
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = allProps.getProperty("input.topic.name");
final String outputTopic = allProps.getProperty("output.topic.name");
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> {
counter++;
if (counter == 2 || counter == 8 || counter == 15) {
throw new ProcessorException("It works on my box!!!");
}
return value.toUpperCase();
})
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
public void createTopics(Properties allProps) {
try (AdminClient client = AdminClient.create(allProps)) {
List<NewTopic> topicList = new ArrayList<>();
NewTopic sessionInput = new NewTopic(allProps.getProperty("input.topic.name"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor")));
topicList.add(sessionInput);
NewTopic counts = new NewTopic(allProps.getProperty("output.topic.name"),
Integer.parseInt(allProps.getProperty("output.topic.partitions")),
Short.parseShort(allProps.getProperty("output.topic.replication.factor")));
topicList.add(counts);
client.createTopics(topicList);
}
}
public Properties loadEnvProperties(String fileName) throws IOException {
Properties allProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
allProps.load(input);
input.close();
return allProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
StreamsUncaughtExceptionHandling tw = new StreamsUncaughtExceptionHandling();
Properties allProps = tw.loadEnvProperties(args[0]);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Change this to StreamsConfig.EXACTLY_ONCE to eliminate duplicates
allProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
Topology topology = tw.buildTopology(allProps);
tw.createTopics(allProps);
TutorialDataGenerator dataGenerator = new TutorialDataGenerator(allProps);
dataGenerator.generate();
final int maxFailures = Integer.parseInt(allProps.getProperty("max.failures"));
final long maxTimeInterval = Long.parseLong(allProps.getProperty("max.time.millis"));
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval);
streams.setUncaughtExceptionHandler(exceptionHandler);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
}
});
try {
streams.cleanUp();
streams.start();
} catch (Throwable e) {
System.exit(1);
}
}
static class TutorialDataGenerator {
final Properties properties;
public TutorialDataGenerator(final Properties properties) {
this.properties = properties;
}
public void generate() {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
String topic = properties.getProperty("input.topic.name");
List<String> messages = Arrays.asList("All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit");
messages.forEach(message -> producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace(System.out);
} else {
System.out.printf("Produced record (%s) at offset %d to topic %s %n", message, metadata.offset(), metadata.topic());
}
}));
}
}
}
}
Now that we have data generation working, let’s build your application by running:
./gradlew shadowJar
Now that you have an uberjar for the Kafka Streams application, you can launch it locally.
The application for this tutorial includes a record generator to populate the topic data. Here is the list of records produced:
"All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit"
Since we force some exceptions at different intervals while the streams application runs, you should see some stack traces in the console indicating an error, but the application will continue running. However, when the application encounters an error that meets the threshold for max errors, it will shut down.
Now run the following program, but watch the logs in the console and let the application run for a few seconds.
java -jar build/libs/error-handling-standalone-0.0.1.jar configuration/dev.properties
You should observe it shutting down and see something similar to this in the console
INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] State transition from PENDING_ERROR to ERROR
INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] Streams client stopped to ERROR completely
Now that you’ve run the Kafka Streams application, it should have shut it self down due to reaching the max-error threshold.
Let’s now run the kafka-console-consumer
to confirm the output:
docker exec -t broker kafka-console-consumer \
--bootstrap-server broker:9092 \
--topic output-topic \
--from-beginning \
--max-messages 6
Your results should look something like this:
ALL
ALL
STREAMS
LEAD
TO
CONFLUENT
ALL
STREAMS
LEAD
TO
CONFLUENT
GO
Processed a total of 12 messages
You’ll notice there are some duplicated values in the output. This duplication is to be expected, as the streams application is running with the default processing mode of AT_LEAST_ONCE
. Duplicate values is one thing to consider when using REPLACE_THREAD
with the StreamsUncaughtExceptionHander
, since this is analogous to using retries with the KafkaProducer
. If you don’t want duplicate values, you should consider running with the processing mode of EXACTLY_ONCE
First, create a test file at configuration/test.properties
:
application.id=error-handling-test
bootstrap.servers=localhost:29092
input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1
output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1
max.failures=3
max.time.millis=120000
Create a directory for the tests to live in:
mkdir -p src/test/java/io/confluent/developer
Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver
class makes this much more pleasant that it would otherwise be.
The test for our streams application is simple, but we have two scenarios to cover. The first is when the data is not in the expected format, so we expect that the topology will throw an exception. The second case is the happy path where the data is as we expect:
@Test
public void shouldThrowException() {
assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(List.of("foo", "bar"))); (1)
}
@Test
public void shouldProcessValues() { (2)
var validMessages = Collections.singletonList("foo");
var expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
inputTopic.pipeValueList(validMessages);
var actualResults = outputTopic.readValuesToList();
assertEquals(expectedMessages, actualResults);
}
1 | Test verifying unexpected format throws exception |
2 | Test validating the expected processing |
We also have logic in the MaxFailuresUncaughtExceptionHandler
that needs testing as well. Just like the streams application test, we have two scenarios to verify.
The case when errors are spread out so the exception handler should return REPLACE_THREAD
The case when the errors occur within our window and we expect the handler to return a SHUTDOWN_APPLICATION
@Test
public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception { (1)
for (int i = 0; i < 10; i++) {
assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
Thread.sleep(200);
}
}
@Test
public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception { (2)
assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
Thread.sleep(50);
assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
}
1 | Test validating errors spread out result in replacing the thread |
2 | This test validates that a bunch of errors in a small timeframe result in a shutdown |
With the brief testing discussion done, let’s create our two test files.
First create the topology test file at src/test/java/io/confluent/developer/StreamsUncaughtExceptionHandlingTest.java
.
package io.confluent.developer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertThrows;
public class StreamsUncaughtExceptionHandlingTest {
private final static String TEST_CONFIG_FILE = "configuration/test.properties";
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
private TopologyTestDriver testDriver;
@Before
public void setUp() throws IOException {
final StreamsUncaughtExceptionHandling instance = new StreamsUncaughtExceptionHandling();
final Properties allProps = instance.loadEnvProperties(TEST_CONFIG_FILE);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final String sessionDataInputTopic = allProps.getProperty("input.topic.name");
final String outputTopicName = allProps.getProperty("output.topic.name");
final Topology topology = instance.buildTopology(allProps);
testDriver = new TopologyTestDriver(topology, allProps);
final Serializer<String> keySerializer = Serdes.String().serializer();
final Serializer<String> exampleSerializer = Serdes.String().serializer();
final Deserializer<String> valueDeserializer = Serdes.String().deserializer();
final Deserializer<String> keyDeserializer = Serdes.String().deserializer();
inputTopic = testDriver.createInputTopic(sessionDataInputTopic, keySerializer, exampleSerializer);
outputTopic = testDriver.createOutputTopic(outputTopicName, keyDeserializer, valueDeserializer);
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void shouldThrowException() {
assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(Arrays.asList("foo", "bar")));
}
@Test
public void shouldProcessValues() {
List<String> validMessages = Collections.singletonList("foo");
List<String> expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
inputTopic.pipeValueList(validMessages);
List<String> actualResults = outputTopic.readValuesToList();
assertEquals(expectedMessages, actualResults);
}
}
Then create the handler test file at src/test/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandlerTest.java
.
package io.confluent.developer;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;
public class MaxFailuresUncaughtExceptionHandlerTest {
private MaxFailuresUncaughtExceptionHandler exceptionHandler;
private final IllegalStateException worksOnMyBoxException = new IllegalStateException("Strange, It worked on my box");
@Before
public void setUp() {
long maxTimeMillis = 100;
int maxFailures = 2;
exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeMillis);
}
@Test
public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception {
for (int i = 0; i < 10; i++) {
assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
Thread.sleep(200);
}
}
@Test
public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception {
assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
Thread.sleep(50);
assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
}
}
Now run the test, which is as simple as:
./gradlew test
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.
Sign up for Confluent Cloud, a fully managed Apache Kafka service.
After you log in to Confluent Cloud Console, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
Next, from the Confluent Cloud Console, click on Clients
to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.