Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build
and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/errors/StreamsErrorHandling.java
.
In this exercise, you'll essentially take the Basic Operations exercise and add error handling code to it.
Begin with the following code, adding to it as necessary. (Note that the static Boolean throwErrorNow
exists for simulation purposes only, and the streamWithErrorHandling
filter's mapValues
is set up to throw an exception the first time it encounters a record for transient error simulation purposes only.)
public class StreamsErrorHandling {
//This is for learning purposes only!
static boolean throwErrorNow = true;
public static void main(String[] args) throws IOException {
final Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-error-handling");
StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = streamsProps.getProperty("error.input.topic");
final String outputTopic = streamsProps.getProperty("error.output.topic");
final String orderNumberStart = "orderNumber-";
KStream<String, String> streamWithErrorHandling =
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
streamWithErrorHandling.filter((key, value) -> value.contains(orderNumberStart))
.mapValues(value -> {
if (throwErrorNow) {
throwErrorNow = false;
throw new IllegalStateException("Retryable transient error");
}
return value.substring(value.indexOf("-") + 1);
})
.filter((key, value) -> Long.parseLong(value) > 1000)
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
}
}
Begin by creating a DeserializationExceptionHandler
above your main
method to handle any serialization errors. (We’ll discuss errorCounter
in the next step.)
public static class StreamsDeserializationErrorHandler implements DeserializationExceptionHandler {
int errorCounter = 0;
Underneath that, implement a handle
method. (Note that each of the various error handling interfaces provides a handle
method.)
@Override
public DeserializationHandlerResponse handle(ProcessorContext context,
ConsumerRecord<byte[], byte[]> record,
Exception exception) {
if (errorCounter++ < 25) {
return DeserializationHandlerResponse.CONTINUE;
}
return DeserializationHandlerResponse.FAIL;
}
The errorCounter
variable counts errors, and if there are fewer than 25, your program will continue processing. Once the counter exceeds 25, the program will stop processing.
Complete the implementation by adding a no-op configure method:
@Override
public void configure(Map<String, ?> configs) { }
}
Under the first error handler, create an instance of the ProductionExceptionHandler
interface. As with the other error handler, you'll add a handle
method with the condition for continued processing: If it's a RecordTooLargeException
, then the application will continue processing, but if it's any other exception, you'll shut down the application.
public static class StreamsRecordProducerErrorHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
if (exception instanceof RecordTooLargeException ) {
return ProductionExceptionHandlerResponse.CONTINUE;
}
return ProductionExceptionHandlerResponse.FAIL;
}
Add a no-op configure method, as before:
@Override
public void configure(Map<String, ?> configs) { }
}
Next, add a StreamsUncaughtExceptionHandler
. As with the other error handlers, you'll implement the handle
method, checking whether the exception is an instance of StreamsException
, which basically means that it's a wrapped user code exception. If it is, you'll extract the underlying exception.
public static class StreamsCustomUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Override
public StreamThreadExceptionResponse handle(Throwable exception)
You also need code to evaluate the exception; you're looking for a specific exception that is transient in nature and that justifies replacing the stream thread.
{
if (exception instanceof StreamsException) {
Throwable originalException = exception.getCause();
if (originalException.getMessage().equals("Retryable transient error")) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
}
}
If that specific error is returned, we return a REPLACE_THREAD
response. Otherwise, we shut down the instance with a SHUTDOWN_CLIENT
response.
Now add the code to wire up the Kafka Streams application with your error handlers.
First, add the DeserializationExceptionHandler
implementation class to the configurations:
streamsProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsDeserializationErrorHandler.class);
Then add the ProductionExceptionHandler
implementation class to the configurations:
streamsProps.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsRecordProducerErrorHandler.class);
Under the streamWithErrorHandling
filter, create your KafkaStreams
instance:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
Then set the StreamsUncaughtExceptionHandler
instance:
kafkaStreams.setUncaughtExceptionHandler(new StreamsCustomUncaughtExceptionHandler());
Note that this is set directly on the KafkaStreams
object and not in the configurations.
As with the other exercises, create the utility class and the code to run your application:
TopicLoader.runProducer();
kafkaStreams.start();
Start your application with this command
./gradlew runStreams -Pargs=errors
Then you'll quickly see a big, ugly stack trace. But the StreamsUncaughtExceptionHandler
will do its job, and you'll see the application recover and print some processing output on the console.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.