Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Error Handling

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/errors/StreamsErrorHandling.java.

In this exercise, you'll essentially take the Basic Operations exercise and add error handling code to it.

  1. Begin with the following code, adding to it as necessary. (Note that the static Boolean throwErrorNow exists for simulation purposes only, and the streamWithErrorHandling filter's mapValues is set up to throw an exception the first time it encounters a record for transient error simulation purposes only.)

    public class StreamsErrorHandling {
        //This is for learning purposes only!
        static boolean throwErrorNow = true;
    
        public static void main(String[] args) throws IOException {
            final Properties streamsProps = StreamsUtils.loadProperties();
            streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-error-handling");
    
            StreamsBuilder builder = new StreamsBuilder();
            final String inputTopic = streamsProps.getProperty("error.input.topic");
            final String outputTopic = streamsProps.getProperty("error.output.topic");
    
            final String orderNumberStart = "orderNumber-";
            KStream<String, String> streamWithErrorHandling =
                builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                    .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
    
            streamWithErrorHandling.filter((key, value) -> value.contains(orderNumberStart))
                .mapValues(value -> {
                    if (throwErrorNow) {
                        throwErrorNow = false;
                        throw new IllegalStateException("Retryable transient error");
                    }
                    return value.substring(value.indexOf("-") + 1);
                })
                .filter((key, value) -> Long.parseLong(value) > 1000)
                .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
    
          }
    }
  2. Begin by creating a DeserializationExceptionHandler above your main method to handle any serialization errors. (We’ll discuss errorCounter in the next step.)

    public static class StreamsDeserializationErrorHandler implements DeserializationExceptionHandler {
        int errorCounter = 0;

    Underneath that, implement a handle method. (Note that each of the various error handling interfaces provides a handle method.)

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context,
                             ConsumerRecord<byte[], byte[]> record,
                             Exception exception) {
        if (errorCounter++ < 25) {
            return DeserializationHandlerResponse.CONTINUE;
        }
        return DeserializationHandlerResponse.FAIL;
    }

    The errorCounter variable counts errors, and if there are fewer than 25, your program will continue processing. Once the counter exceeds 25, the program will stop processing.

    Complete the implementation by adding a no-op configure method:

    @Override
        public void configure(Map<String, ?> configs) { }
    }   
  3. Under the first error handler, create an instance of the ProductionExceptionHandler interface. As with the other error handler, you'll add a handle method with the condition for continued processing: If it's a RecordTooLargeException, then the application will continue processing, but if it's any other exception, you'll shut down the application.

    public static class StreamsRecordProducerErrorHandler implements ProductionExceptionHandler {
        @Override
        public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
            if (exception instanceof RecordTooLargeException ) {
                return ProductionExceptionHandlerResponse.CONTINUE;
            }
            return ProductionExceptionHandlerResponse.FAIL;
        }

    Add a no-op configure method, as before:

    @Override
    public void configure(Map<String, ?> configs) { }
    }
  4. Next, add a StreamsUncaughtExceptionHandler. As with the other error handlers, you'll implement the handle method, checking whether the exception is an instance of StreamsException, which basically means that it's a wrapped user code exception. If it is, you'll extract the underlying exception.

    public static class StreamsCustomUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
        @Override
        public StreamThreadExceptionResponse handle(Throwable exception)

    You also need code to evaluate the exception; you're looking for a specific exception that is transient in nature and that justifies replacing the stream thread.

    {
            if (exception instanceof StreamsException) {
                Throwable originalException = exception.getCause();
                if (originalException.getMessage().equals("Retryable transient error")) {
                    return StreamThreadExceptionResponse.REPLACE_THREAD;
                }
            }
            return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        }
    }

    If that specific error is returned, we return a REPLACE_THREAD response. Otherwise, we shut down the instance with a SHUTDOWN_CLIENT response.

  5. Now add the code to wire up the Kafka Streams application with your error handlers.

    First, add the DeserializationExceptionHandler implementation class to the configurations:

    streamsProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsDeserializationErrorHandler.class);

    Then add the ProductionExceptionHandler implementation class to the configurations:

    streamsProps.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsRecordProducerErrorHandler.class);
  6. Under the streamWithErrorHandling filter, create your KafkaStreams instance:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
  7. Then set the StreamsUncaughtExceptionHandler instance:

    kafkaStreams.setUncaughtExceptionHandler(new StreamsCustomUncaughtExceptionHandler());

    Note that this is set directly on the KafkaStreams object and not in the configurations.

  8. As with the other exercises, create the utility class and the code to run your application:

    TopicLoader.runProducer();
    kafkaStreams.start();
  9. Start your application with this command

     ./gradlew runStreams -Pargs=errors

    Then you'll quickly see a big, ugly stack trace. But the StreamsUncaughtExceptionHandler will do its job, and you'll see the application recover and print some processing output on the console.


  1. This is the final exercise in the course, so make sure to delete your Confluent Cloud cluster. To do this, go to Cluster settings on the left-hand side menu, then click Delete cluster. Enter your cluster name, then select Continue.

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.