Principal Software Practice Lead
In this exercise you will explore checkpointing and observe how Flink uses checkpoints to recover from failures. This exercise uses the same Docker-based setup as some of the other exercises.
Note: there is no version of this exercise for Confluent Cloud. That's because Confluent Cloud for Apache Flink is a managed service that abstracts away this level of detail.
Begin by enabling and configuring checkpointing for the jobs created by the SQL Client. You can do this by setting the checkpointing interval in the SQL Client:
set 'execution.checkpointing.interval' = '1000';
You should also change the way that results are displayed, which will make it easier to follow along:
set 'sql-client.execution.result-mode' = 'changelog';
This table will work nicely for this exercise:
CREATE TABLE `one_per_second` (
`ts` TIMESTAMP(3)
)
WITH (
'connector' = 'faker',
'rows-per-second' = '1',
'fields.ts.expression' = '#{date.past ''1'',''SECONDS''}'
);
SELECT COUNT(*) FROM one_per_second;
Now open a second terminal window in the directory where you have the learn-apache-flink-101-exercises repo, and simulate a failure by killing Flink's task manager:
docker compose kill taskmanager
At this point the query that is running inside the SQL console will stall. That's fine; just leave it in that stuck state for now. If you now examine the job in the Flink UI (at http://localhost:8081) you'll see that job manager is trying to restart the job, but lacks the resources to do so.
If this was a proper Flink deployment, and a task manager had failed, the cluster management framework (e.g., Kubernetes) would automatically start a new task manager and the job would recover without any manual intervention. But in this case, when you are ready for the query to resume, you'll need to manually start a new task manager. But before doing so, arrange your windows so you can see what happens in the SQL console once the job is able to resume the stalled query.
Now start a new task manager, to replace the one you killed earlier. You will then see that the SELECT COUNT(*) FROM one_per_second query has resumed, just as though nothing had gone wrong.
docker compose up --no-deps -d taskmanager
In this scenario, where the results are being displayed in the SQL Client, it was enough to enable checkpointing to get Flink SQL to run this query with exactly once semantics.
If you want to achieve the same consistency guarantee with a Kafka sink, more configuration is required. The documentation on consistency guarantees for the Kafka sink has more information, and there's a complete example in the Flink Cookbook.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.