Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/errors/StreamsErrorHandling.java.
In this exercise, you'll essentially take the Basic Operations exercise and add error handling code to it.
Begin with the following code, adding to it as necessary. (Note that the static Boolean throwErrorNow exists for simulation purposes only, and the streamWithErrorHandling filter's mapValues is set up to throw an exception the first time it encounters a record for transient error simulation purposes only.)
public class StreamsErrorHandling {
//This is for learning purposes only!
static boolean throwErrorNow = true;
public static void main(String[] args) throws IOException {
final Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-error-handling");
StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = streamsProps.getProperty("error.input.topic");
final String outputTopic = streamsProps.getProperty("error.output.topic");
final String orderNumberStart = "orderNumber-";
KStream<String, String> streamWithErrorHandling =
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
streamWithErrorHandling.filter((key, value) -> value.contains(orderNumberStart))
.mapValues(value -> {
if (throwErrorNow) {
throwErrorNow = false;
throw new IllegalStateException("Retryable transient error");
}
return value.substring(value.indexOf("-") + 1);
})
.filter((key, value) -> Long.parseLong(value) > 1000)
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
}
}
Begin by creating a DeserializationExceptionHandler above your main method to handle any serialization errors. (We’ll discuss errorCounter in the next step.)
public static class StreamsDeserializationErrorHandler implements DeserializationExceptionHandler {
int errorCounter = 0;
Underneath that, implement a handle method. (Note that each of the various error handling interfaces provides a handle method.)
@Override
public DeserializationHandlerResponse handle(ProcessorContext context,
ConsumerRecord<byte[], byte[]> record,
Exception exception) {
if (errorCounter++ < 25) {
return DeserializationHandlerResponse.CONTINUE;
}
return DeserializationHandlerResponse.FAIL;
}
The errorCounter variable counts errors, and if there are fewer than 25, your program will continue processing. Once the counter exceeds 25, the program will stop processing.
Complete the implementation by adding a no-op configure method:
@Override
public void configure(Map<String, ?> configs) { }
}
Under the first error handler, create an instance of the ProductionExceptionHandler interface. As with the other error handler, you'll add a handle method with the condition for continued processing: If it's a RecordTooLargeException, then the application will continue processing, but if it's any other exception, you'll shut down the application.
public static class StreamsRecordProducerErrorHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
if (exception instanceof RecordTooLargeException ) {
return ProductionExceptionHandlerResponse.CONTINUE;
}
return ProductionExceptionHandlerResponse.FAIL;
}
Add a no-op configure method, as before:
@Override
public void configure(Map<String, ?> configs) { }
}
Next, add a StreamsUncaughtExceptionHandler. As with the other error handlers, you'll implement the handle method, checking whether the exception is an instance of StreamsException, which basically means that it's a wrapped user code exception. If it is, you'll extract the underlying exception.
public static class StreamsCustomUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Override
public StreamThreadExceptionResponse handle(Throwable exception)
You also need code to evaluate the exception; you're looking for a specific exception that is transient in nature and that justifies replacing the stream thread.
{
if (exception instanceof StreamsException) {
Throwable originalException = exception.getCause();
if (originalException.getMessage().equals("Retryable transient error")) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
}
}
If that specific error is returned, we return a REPLACE_THREAD response. Otherwise, we shut down the instance with a SHUTDOWN_CLIENT response.
Now add the code to wire up the Kafka Streams application with your error handlers.
First, add the DeserializationExceptionHandler implementation class to the configurations:
streamsProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsDeserializationErrorHandler.class);
Then add the ProductionExceptionHandler implementation class to the configurations:
streamsProps.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsRecordProducerErrorHandler.class);
Under the streamWithErrorHandling filter, create your KafkaStreams instance:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
Then set the StreamsUncaughtExceptionHandler instance:
kafkaStreams.setUncaughtExceptionHandler(new StreamsCustomUncaughtExceptionHandler());
Note that this is set directly on the KafkaStreams object and not in the configurations.
As with the other exercises, create the utility class and the code to run your application:
TopicLoader.runProducer();
kafkaStreams.start();
Start your application with this command
./gradlew runStreams -Pargs=errors
Then you'll quickly see a big, ugly stack trace. But the StreamsUncaughtExceptionHandler will do its job, and you'll see the application recover and print some processing output on the console.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
I think the first caucus stream application an add error handling to it. All the topology code is in place. So you'll just add the parts needed to complete the error handling exercise. The use of the static Boolean through error now variable will be clear in a moment. Please know that this variable exists for simulation purposes only. First, to handle any Deserialization errors, create the Deserialization exception handler. We'll talk about the error counter instance variable in a minute. Next, you'll implement the handle method. Take note that all the different error handling interfaces provide a handle method. Now we'll see the error counter variable and action. If there are less than 25 errors so far, you'll continue processing. Otherwise, once the counter exceeds 25, you'll stop processing. Finally, you'll complete the implementation by adding a new app configure method. Next on the error handling list, you'll create an instance of the production exception handler interface. As with the previous error handler, you'll implement the handle method and here you'll add the condition for continued processing. If it's a record too large exception, then the application will continue processing. And if it's not that exact exception, you'll shut down the application. Finally, as before, you'll complete the implementation by adding a new app configure method implementation. Next, in our error handling exercise. Let's add a streams uncaught exception handler. As we've seen with the other error handlers, you'll implement the handle method. In it, you'll check if the exception is an instance of streams exception, which basically means it's a wrapped user code exception. And if yes, you'll extract the underlying exception. Next, add a condition for evaluating the exception. You're looking for a specific exception that you know can occur and isn't transient in nature. And it's worth replacing the stream thread. If it is the transient exception that you expect, return I replaced thread in response. It's not that specific exception. You'll want to shut this off because streams instance down by returning a shutdown client response. Now let's wire up the Kafka Streams Application with the error handlers. First, add the Deserialization exception handler implementation class to the configs. Then at the producer exception handler implementation class to the configs. Okay, now we should mention a small tweak that we've made to the topology for demonstration purposes. Here, we've created a map values that will throw an exception. The first time it encounters a record. Note that this is for a transient error simulation purposes only. Now you create the Kafka streams instance. Then you'll set the streams uncaught exception handler instance. Take note, you set this error handler directly on the Kafka Streams object and not in the configs. Now create the topic loader utility class. As a reminder, using this class for the ease of learning and you most likely will never use a utility like this, either in development or actual production, then add the line that starts the Kafka streams application, and actually start it. You'll quickly see a big ugly stack trees, but take heart. The streams uncaught exception handler does its job and you'll see the application recover and print the processing output on the console.