In this tutorial, you’ll learn how to implement and plug in Kafka Streams exception handlers to deal with errors that occur during different phases of a Kafka Streams application:
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
The KafkaStreamsBuggyApplicationTest.testExceptionHandlers test included in this tutorial demonstrates how exceptions are triggered and handled, as described in the sections below.
Run the test:
./gradlew kafka-streams-exception-handlers:kstreams:test
The handlers in this test simply log the type of exception triggered and then continue processing. You’ll see output like:
...
ProcessingExceptionHandler triggered
...
ProductionExceptionHandler.handleSerializationException triggered
...
DeserializationExceptionHandler triggered
...
Kafka Streams applications must deserialize events as they enter the processing topology, typically using the Consumed.with method when instantiating a KStream, KTable, or GlobalKTable.
To specify a built-in or custom handler for exceptions that occur during deserialization, use the deserialization.exception.handler configuration. The available handler implementations in Kafka are the LogAndContinueExceptionHandler or the LogAndFailExceptionHandler, or you may implement a custom handler by implementing the DeserializationExceptionHandler interface. For example, the ContinuingDeserializationExceptionHandler in this tutorial logs a message and returns DeserializationHandlerResponse.CONTINUE, allowing the application to continue processing records:
public class ContinuingDeserializationExceptionHandler implements DeserializationExceptionHandler {
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
System.out.println("DeserializationExceptionHandler triggered");
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> configs) {
}
}
Deserialization exceptions occur when data in the input topic wasn’t serialized in a way that matches how the Kafka Streams application attempts to deserialize it.
For example, the buggy application consumes data from the input topic assuming an integer key and string value:
builder.stream("input-topic", Consumed.with(Serdes.Integer(), Serdes.String()))
The test class triggers a deserialization exception by serializing the key as a string:
// topic incorrectly serialized the key as a String, which will trigger a deserialization exception in the app
TestInputTopic<String, String> badInputTopic = driver.createInputTopic("input-topic",
stringSerde.serializer(), stringSerde.serializer());
badInputTopic.pipeInput("1", "foo");
KIP-1033 introduced a plugin-based mechanism to handle exceptions during message processing, similar to deserialization exception handling.
To configure a built-in or custom handler, use the processing.exception.handler configuration. The available handler implementations in Kafka are the LogAndContinueProcessingExceptionHandler or the LogAndFailProcessingExceptionHandler, or you may implement a custom handler by implementing the ProcessingExceptionHandler interface. The ContinuingProcessingExceptionHandler in this tutorial is very similar to the LogAndContinueProcessingExceptionHandler. It logs a message to show you that the handler was triggered, and returns ProcessingHandlerResponse.CONTINUE so that the application will continue processing records:
public class ContinuingProcessingExceptionHandler implements ProcessingExceptionHandler {
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record,
final Exception exception) {
System.out.println("ProcessingExceptionHandler triggered");
return ProcessingHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> configs) {
}
}
One way to trigger a processing exception is to throw an exception from a Processor that is invoked by calling KStream.process. This tutorial's application includes a Processor that fails randomly and otherwise is a no-op that forwards the record:
@Override
public void process(Record record) {
if (Math.random() < 0.5) {
throw new RuntimeException("fail!!");
}
context.forward(record);
}
KIP-210 introduced a mechanism for handling exceptions that occur while producing records back to Kafka. This was extended in KIP-399 to include serialization exceptions.
To configure a handler, use the production.exception.handler configuration. The class must implement the ProductionExceptionHandler interface. Here’s an example from this tutorial:
public class ContinuingProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
System.out.println("ProductionExceptionHandler.handle triggered");
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
final SerializationExceptionOrigin origin) {
System.out.println("ProductionExceptionHandler.handleSerializationException triggered");
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> configs) {
}
}
Production exceptions can occur for various reasons, including:
In this tutorial, a production exception is triggered using a custom serializer that randomly fails:
Serde<String> randomlyFailingStringSerde = Serdes.serdeFrom(new RandomlyFailingSerializer(), Serdes.String().deserializer());
builder.stream("input-topic", Consumed.with(Serdes.Integer(), Serdes.String()))
...
.to("output-topic", Produced.with(Serdes.Integer(), randomlyFailingStringSerde));