Get Started Free
‹ Back to courses
course: Kafka Streams 101

Error Handling

11 min
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Error Handling

Distributed systems will always encounter errors, but not every error is a stop-the-world situation: Network-partition errors, for example, usually resolve pretty quickly. Or there could be a change in partition ownership: Brokers in Apache Kafka® are responsible for a certain number of partitions, and if you make a produce or fetch request during an ownership change, it will result in an error. But it's considered a recoverable error, which means that the request will try again.

Generally, you need to provide a graceful mechanism for recovering from errors. You want recovery to happen automatically if possible, and you want to shut down only if the situation is truly unrecoverable.

Kafka Streams Error Categories

Kafka Streams has three broad categories of errors: entry (consumer) errors, processing (user logic) errors, and exit (producer) errors.

Entry

This type of error happens when records are coming in, and is usually a network or deserialization error.

The related error handler is the DeserializationExceptionHandler interface, which has a default configuration of LogAndFailExceptionHandler. This default handler logs the exception and then fails. The other option is the LogAndContinueExceptionHandler, which logs the error but continues to run. You can also provide a custom implementation and specify the classname via a Kafka Streams configuration.

Processing

Generally, any exception related to logic that you provide will eventually bubble up and shut down the application. This could be related to mismatched types, for example. Kafka Streams provides a StreamsUncaughtExceptionHandler to deal with these exceptions, which are not handled by Kafka Streams (an example would be the ProducerFencedException).

The StreamsUncaughtExceptionHandler returns an enum, and you have three options: you can replace the StreamThread, shut down the individual Kafka Streams instance, or shut down all Kafka Streams instances (i.e., all instances with the same application ID, which are viewed as one application by the brokers). If the error is severe enough, you want to shut down all applications. This is accomplished via a rebalance, where the command to shut down is communicated to all of the instances.

Exit

This type of error happens when writing records to a Kafka topic and is usually related to network or serialization errors (a common example is the RecordTooLargeException). These errors are handled with the ProductionExceptionHandler interface, and you can respond by continuing to process, or by failing. Note that the ProductionExceptionHandler only applies to exceptions that are not handled by Kafka Streams; it doesn't apply, for example, to a security exception, an authorization exception, or an invalid host exception (these would always result in failure). The default configuration for the ProductionExceptionHandler is the DefaultProductionExceptionHandler, and it always fails. For any other option, you need to provide your own implementation.

Kafka Streams uses embedded producer and consumer instances. These clients have their own configurations, defaults that you don't usually need to worry about.

But if you optimize for resilience, then the way in which some of these client configurations handle errors—for example, by blocking—will interfere with your efforts. In other words, optimizing your configurations could have adverse effects on your Kafka Streams application in terms of how long it takes to process records. On the other hand, if you're too loose with your configurations, it's possible that your application could shut down even for transient issues.

One solution is a configuration called task.timeout.config, which starts a timer when errors occur, so that Kafka Streams can try to make progress with other tasks. The failed task is retried until the timeout is reached, at which point it will finally fail.

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Error Handling

Hi, I'm Sophie Blee-Goldman with Confluent. And in this module, we're going to talk about error handling in Kafka Streams. Now errors are inevitable, especially when you're working with distributed systems and applications. Not all errors are necessarily worthy of stopping the world though. You, you might have a fatal error sometimes, but often you have a transient error, like a network partition, which will often resolve quickly. Not always, but typically, and things like change in partition ownership. This is going to be something transient that might cause an error, but not one that you would want to take down your entire application for. This means that you need to provide a mechanism for gracefully handling these errors when they do arise. You want to recover automatically when possible and generally shut down when it's truly unrecoverable and you need to intervene as a developer to make sure that it recovers in a good state. Now, luckily, Kafka Streams provide some mechanisms for dealing with these errors. So in Kafka Streams, there are really three broad categories where errors can occur. The first is the entry point. This is when you're consuming records. It has to do largely with the consumer client itself. This can be things like network or deserialization errors. Again, the consumer is connecting and fetching from the broker. So any network disruption or network partition can result in a transient or a fatal error on the consumer itself. Similarly deserialization errors can occur when the consumer is trying to actually read out the objects from your topic, from the bytes that were encoded in the topic itself. The next type is the actual processing of records. This is generally something that would be in the user code in your application. Something like not being in the expected format. So let's say you have string values and you want to convert them into the long form of that string. Now in general, this should be processing happily in converting the string into this number. But what happens if suddenly you get a string with an accident or some character that is non-numeric in that case, it would likely throw an exception. And this is the kind of processing error that you might see in Kafka Streams. And lastly, there is the exit point. So this is when you are producing records and they are leaving Kafka Streams, kind of the flip side of the consumer, the producer client also of course connects across the network and suffers from similar disruptions. And likewise it has to serialize the records. So any serialization exceptions fall under the exit or producer category of exception. Now in Kafka Streams, each of these categories of errors has its own special handler. And we'll go over these in more detail later. These are something that the user can provide to decide how best to deal with recoverable errors. In some case, it might be best to acknowledge it and continue. Maybe you want to log the error, but you don't want it to cause everything to shut down. But other times it might be a more fatal error. It might be something that you think might cause data corruption or streams or just an unrecoverable state. And you need to tell it how to fix things in order to get it up and running again. So we'll cover how all of these handlers are used to specify the streams, what exactly to do, to handle these things appropriately. So first up we have the consuming errors. Now these are generally going to be, as we mentioned, the deserialization type exceptions. And for that, we have the DeserializationExceptionHandler interface. Now this is something that you, the user can customize by implementing this interface and tell Kafka Streams exactly what to do specific to your application logic. Or you can use one of the out of the box or default exception handlers that Kafka Streams provides. So the default is actually the LogAndFailExceptionHandler, which as the name suggests will log the exception that it hit and the record that caused it. And then it will choose to fail. It will shut down Kafka Streams, and until you, the user can react to it or choose to resume it. Another out of the box option is to use the LogAndContinueExceptionHandler. Similarly, it will of course log the error that you hit, but instead of failing and shutting down Kafka Streams, it will discontinue and move on to the next record. If either of these really suits your use case perfectly, then you can provide a custom implementation and do whatever you wish for deserialization exceptions. Now for processing errors, this generally spans a much wider realm of possible errors since this is anything that can happen in user code or things that might happen in the Kafka Streams framework itself, which might require you the user to decide how severe you think it is. And whether you want to bubble it up and shut down the application or choose to continue in some way, Kafka Streams provides the StreamsUncaughtExceptionHandler interface for you to specify how exactly you want to handle these kinds of processing exceptions. You can use it to log anything that you want and then choose what exactly to do. So the interface allows you to return one of three options. So you can choose to replace the thread, which means that the stream thread, which was processing the record that hit this exception would just be replaced and everything would carry on as normal. You can also choose to shut down just that individual streams instance, which means just that process running on that specific node, or you can shut down the entire streams application. This means shut down, not just the instance on that node, but also any other instance of the same streams application running on any other nodes. And same streams application just means running with the same application ID. Now, lastly, we have the ProductionExceptionHandler for any errors that you might hit while trying to produce records from Kafka Streams to the broker. The ProductionExceptionHandler is again, just an interface that allows you to specify what you want to do should a, an exception occur. You can respond by choosing to continue processing, or you can choose to fail similar to the DeserializationExceptionHandler. The default configuration in this case is the DefaultProductionExceptionHandler, which returns the option to always fail for any other option, you would need to implement your own ProductionExceptionHandler interface. Now note that this only applies to exceptions, which are not handled by Kafka Streams. So a number of exceptions that you might have for the producer client, such as producer fence exception, these would all be handled internally by Kafka Streams. But there are some more user or application specific exceptions, which Kafka Streams does not know how to handle. And these are what would be passed on to the ProductionExceptionHandler. So for example, you might get a RecordTooLargeException. So this is the kind of exception you might hit, even during some kind of simple processing, like transforming or converting a string into JSON. So anytime you get a record that is just slightly below the maximum size, it would still be safely stored in Kafka and would be consumed easily by Kafka Streams. But let's say Kafka Streams would then map it into JSON or something that adds a little bit of size and then tries to produce it back to Kafka. In that case, you might get a RecordTooLargeException as the record is now too large for the max message size that the broker has configured. Now we're kind of talking here about some of the clients, Kafka Streams of course, has an embedded producer and consumer instance that it uses to get records from and send records to Kafka. And each of these clients can experience some intermittent, temporary failures. These are things like the network partition that we mentioned earlier, or a change in the, the broker or the leader for our partition or the group coordinator, things like that are exceptions that would be handled internally to Kafka Streams. So clients have their own configurations for these situations. And this is something that you can configure as a user, if the default configurations are not working for you. In general, the defaults are chosen because they work well across a wide variety of situations, but there might be situations in which you would like to tune some of these defaults for better resilience or better availability or better reaction time upon failure. So optimizing for resilience means that the clients can block for longer. For example, if you increase one of the timeouts, then the clients, the consumer or the producer client might spend a longer time in those calls, which lowers the risk for hitting a timeout exception in the case of failures, but means that Kafka Streams will be blocked on this call for a while. And this can have some adverse effects on the Kafka Streams application. For example, you might then miss the next poll that the consumer has to issue to remain in the consumer group. And this can cause the application instance to get kicked from the group and generally causes some operational issues. Of course, if it's too loose, if your timeout is too low, then you risk the application shutting down on transient issues. Like those network partitions that might have gone away. If you had just waited for a little bit longer. So it's really about tuning to get the right configuration for these clients. And the default is a good place to start. Now, sometimes it can be hard to strike the right trade off between these two considerations. And for that Kafka Streams provides this task.timeout.config. So the idea here is you don't necessarily want to be waiting and blocking forever in a call to one of the clients. So you don't want to set a very, very large timeout. But of course, if you set a small timeout, then you risk this timeout exception hitting. So rather than letting this time at exception bubble up and kill the application, it now uses a timeout that is applied per task. So anytime a client experiences an issue like a timeout exception, Kafka Streams, we'll start a timer for that task. Then Kafka Streams will attempt to make progress on all the other tasks. It continues doing the work of processing and kind of ignores that one task and it's issue for the time being, assuming that it is going to be transient and that it makes sense to retry. Then when the task with the field operation is retried and it succeeds, then process just continues as usual. Or if it does not succeed, then it is we try it until the task.timeout.config is reached. At that point Kafka Streams will re throw the timeout exception all the way up and may kill the application. So those are some of the exceptions and some of the ways that they're handled in Kafka Streams. Now let's go over it exercise so that you can see some of these exception handlers at work.