Software Practice Lead
Flink relies on snapshots of the state it is managing for both failure recovery and for handling operational tasks, such as rescaling, or migrating state during an application upgrade.
This video explains the differences between two types of snapshots -- checkpoints and savepoints -- and presents an example of how they work.
Topics:
Hi, I'm David from Confluent, and I'm here to show you how Flink is able to produce correct and complete results, even when things break. In order to recover from failures, Flink relies on snapshots of its state. All snapshots are written to a durable, distributed file system, such as S3. These snapshots come in two different formats. One kind of snapshot is a checkpoint. Checkpoints are managed automatically by Flink itself, and they're taken for the purpose of recovering from failures, and they're written in a format that is optimized for quick recovery. The other kind of snapshot is called a savepoint. Savepoints are written in a format that is optimized for operational flexibility. That's because their purpose is to enable manual operational tasks, such as upgrading to a new Flink version, or deploying a new version of your application. For example, your application upgrade might require migrating some existing state, and that's something you can only do with a savepoint. So how do these snapshots work? When the processing nodes of a Flink cluster fail, as they sometimes do, Flink needs to somehow quickly resume processing, and do so without making mistakes. So what kind of mistakes are we worried about? Well, Flink might, for example, fail to produce some of the expected results, or it might produce duplicates. To avoid these pitfalls, Flink implements a variant of the Chandy-Lamport distributed snapshot algorithm, which produces globally consistent snapshots. This diagram shows a Flink job that we've seen before. The source is reading from an events topic, filtering out orange events, and counting the remaining events by color. The results are being written out to a results topic. The Flink runtime is automatically taking periodic snapshots of all of the relevant state. The table below the job graph shows one of these snapshots. In this example, each of the parallel sources has written into the snapshot the current offset for each partition it is responsible for. Since the filter operator is stateless, it contributes nothing to the snapshot. Each parallel counting task writes into the snapshot the counters for the colors it is responsible for. These counters reflect the state after having consumed all of the events from the sources up to, but nothing past, the offsets recorded by the source tasks. Similarly, the sink has written into its part of the snapshot the IDs of the transactions which committed the results it produced after consuming every event up to, but not beyond, those same offsets. Given how this has been organized, this snapshot is a globally self-consistent snapshot of all of the state for this job, from all across the Flink cluster. It's important to note that the Flink runtime does not experience a huge hiccup when one of these snapshots is happening. The state is stored in data structures that support multi-version concurrency control, which means that Flink can continue to process events and produce results in new versions of the state, while older versions are still being snapshotted. It does, of course, sometimes happen that task managers fail; either because some code somewhere throws an exception, or something goes wrong with the node itself. When this happens, the job manager will detect the failure, and depending on the restart strategy that has been configured, arrange for the job to be restarted. There are two important things I want to leave you with concerning recovery. Both of these take-away messages stem from the fact that Flink's state snapshots are self-consistent. They include every piece of state that resulted from fully processing the input streams up to the source offsets recorded in the snapshot, and nothing beyond those offsets. The first of these take-aways is that Flink is able to provide what is effectively an exactly-once guarantee. This does not mean that every event is processed exactly once, but rather that every event affects the state Flink is managing exactly once. It's exactly the same as if the failure had never occurred. The second take-away is that recovery requires rolling back the entire cluster to the state recorded in the most recent snapshot. The whole cluster does a restart, not just the node that failed. Now that you understand the theory behind Flink's approach to fault tolerance, I invite you to check out the hands-on exercise on Confluent Developer. This exercise will show you how to enable and configure checkpointing, and observe what happens during failure and recovery. If you aren't already on Confluent Developer, head there now using the link in the video description to access other courses, hands-on exercises, and many other resources for continuing your learning journey.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.