Distributed systems will always encounter errors, but not every error is a stop-the-world situation: Network-partition errors, for example, usually resolve pretty quickly. Or there could be a change in partition ownership: Brokers in Apache Kafka® are responsible for a certain number of partitions, and if you make a produce or fetch request during an ownership change, it will result in an error. But it's considered a recoverable error, which means that the request will try again.
Generally, you need to provide a graceful mechanism for recovering from errors. You want recovery to happen automatically if possible, and you want to shut down only if the situation is truly unrecoverable.
Kafka Streams has three broad categories of errors: entry (consumer) errors, processing (user logic) errors, and exit (producer) errors.
This type of error happens when records are coming in, and is usually a network or deserialization error.
The related error handler is the DeserializationExceptionHandler interface, which has a default configuration of LogAndFailExceptionHandler. This default handler logs the exception and then fails. The other option is the LogAndContinueExceptionHandler, which logs the error but continues to run. You can also provide a custom implementation and specify the classname via a Kafka Streams configuration.
Generally, any exception related to logic that you provide will eventually bubble up and shut down the application. This could be related to mismatched types, for example. Kafka Streams provides a StreamsUncaughtExceptionHandler to deal with these exceptions, which are not handled by Kafka Streams (an example would be the ProducerFencedException).
The StreamsUncaughtExceptionHandler returns an enum, and you have three options: you can replace the StreamThread, shut down the individual Kafka Streams instance, or shut down all Kafka Streams instances (i.e., all instances with the same application ID, which are viewed as one application by the brokers). If the error is severe enough, you want to shut down all applications. This is accomplished via a rebalance, where the command to shut down is communicated to all of the instances.
This type of error happens when writing records to a Kafka topic and is usually related to network or serialization errors (a common example is the RecordTooLargeException). These errors are handled with the ProductionExceptionHandler interface, and you can respond by continuing to process, or by failing. Note that the ProductionExceptionHandler only applies to exceptions that are not handled by Kafka Streams; it doesn't apply, for example, to a security exception, an authorization exception, or an invalid host exception (these would always result in failure). The default configuration for the ProductionExceptionHandler is the DefaultProductionExceptionHandler, and it always fails. For any other option, you need to provide your own implementation.
Kafka Streams uses embedded producer and consumer instances. These clients have their own configurations, defaults that you don't usually need to worry about.
But if you optimize for resilience, then the way in which some of these client configurations handle errors—for example, by blocking—will interfere with your efforts. In other words, optimizing your configurations could have adverse effects on your Kafka Streams application in terms of how long it takes to process records. On the other hand, if you're too loose with your configurations, it's possible that your application could shut down even for transient issues.
One solution is a configuration called task.timeout.config, which starts a timer when errors occur, so that Kafka Streams can try to make progress with other tasks. The failed task is retried until the timeout is reached, at which point it will finally fail.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.