Learn how Apache Flink® can handle hundreds or even thousands of compute nodes running 24/7 and still produce correct results.
Senior Curriculum Developer
Learn how Apache Flink® can handle hundreds or even thousands of compute nodes running 24/7 and still produce correct results.
In recording videos I use a technique I learned years ago
while working with other presenters who also happened to be recording videos.
When I make a mistake, say the wrong word, mispronounce something or realize
something is confusing, I say the word REPHRASE and try casting a spell again.
REPHRASE I say the word REPHRASE and try that portion of the video again.
I’m then able to try again hopefully with a successful result.
On your end, you don’t experience those interruptions, instead, you only see the
best takes. In a lot of ways, exactly-once semantics in Apache Flink work the same way.
What is Exactly Once?
Exactly once semantics allows Flink to have large deployments of hundreds or even thousands
of compute nodes running continuously 24/7 and still claim to produce completely correct results.
At that type of scale, failures are inevitable.
How is this even possible?
When we speak of exactly-once semantics we are referring to
each incoming event affecting the final result exactly once.
This is often referred to as effectively exactly once.
How Flink provides Exactly Once
So, how does Apache Flink provide exactly once semantics?
Let’s imagine our architecture consists of Apache Kafka as our source and sink,
with Apache Flink in between doing some processing.
If everything is going smoothly our records come in, get processed, and head on to our sink.
Flink Snapshots
However, what happens if there is an issue?
Our greatest fear is that of data loss.
So, imagine we had a way to stop the entire cluster at certain points and
take a snapshot of the current state of things.
Then if something went wrong we could just roll back to that snapshot and try again,
similar to my example of recording this video.
What Flink actually does is more clever than this,
because coordinating a global pause of the entire Flink cluster would be both difficult and painful.
Here we have processing nodes that have state.
We capture that state in a snapshot and then write
those snapshots to durable storage like an S3 bucket.
When a failure occurs, the whole system can roll back to the last
successful snapshot and resume processing.
These snapshots are often referred to as checkpoints.
This snapshot model guarantees exactly-once within Flink, but can also work with Kafka sources and
sinks, thereby ensuring effectively exactly-once semantics end-to-end.
I consider this to be one of Flink’s true superpowers.
From a high level, this consists of relying on Kafka transactions to protect each checkpoint,
in combination with recording the current offsets and transaction IDs in each checkpoint.
Then if anything goes wrong between checkpoints,
Flink can roll back its state in concert with rewinding Kafka to try again.
For some of you that might be enough information to feel good
about how Flink provides exactly once semantics.
For others, it may be helpful to visualize how all this ties together, so let’s take a look.
Visualize the Process
Here is our architecture from earlier, albeit with a little more detail.
We still have Apache Kafka as both the source and the sink with
the Flink Job Manager controlling what happens inside Apache Flink.
We also have an ongoing count of each message color.
There are two phases Flink uses to provide exactly once processing.
The first consists of everything that happens before our messages are committed,
and the second being the commit to our sinks.
When the time comes for another checkpoint, the job manager initiates the checkpoint and
assigns it a number, in this example, it’s checkpoint 42.
Each source inserts a checkpoint barrier into its output streams carrying that ID.
Each checkpoint is a snapshot of the current state of our application and the position in our stream.
Each barrier continues downstream, stopping at each operator.
Each operator quickly freezes the current state and assigns the ID,
again 42 in this case and takes a snapshot of the current state.
At this point, the operator continues processing records until the next barrier arrives.
In this case, 43.
Snapshot 42 contains all of the managed state from every operator
that resulted from processing all of the records up to barrier 42.
Eventually, these snapshots are written out to S3, asynchronously.
This is called Chandy-Lamport asynchronous barrier snapshotting.
You can look at the barriers as a way to completely separate the events that
affect one version of the state from the next.
Since the snapshots are being taken asynchronously, the barriers will probably
reach the sinks at the end of the pipeline before all of the operators complete their checkpoints.
But that's okay -- the job manager just waits for every operator to finish,
and then notifies all of the operators that the checkpoint has completed.
This is the end of phase 1 in our two phase commit.
Every operator is then notified that phase 2 can begin,
at which point the sinks commit the transactions they opened earlier.
So no transactions are committed until all of the checkpoint data
has been safely written out to S3 (for example).
However, if at any point an issue occurs, then Flink rolls back the state in every operator to
the state restored from the most recent checkpoint and restarts processing those records again.
This is a reset of all processing nodes where
every record after checkpoint barrier 42 gets processed again.
Conclusion
That’s how we get exactly once processing with Apache Flink: Results are only
committed to Kafka after every operator has successfully written its part of the snapshot.
Let us know in the comments if there are other aspects
of Apache Flink you’d like to know more about.
Be sure to like, share, and subscribe to be notified when we publish other videos.
You’ll also want to check out our Apache Flink courses.
Until next time, have fun streaming.