Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

The Flink Job Lifecycle

Overview

When a Flink job is executed, it is sent to the Flink cluster where it will pass through multiple possible stages in its lifecycle. The exact flow depends on what commands are sent to the job, and whether or not it encounters any errors. This video will introduce the lifecycle of a Flink job as well as some of the commands that can be used to move the job through different lifecycle states.

Topics:

  • Running a Job
  • Finishing a Job
  • Canceling a Job
  • Stopping a Job
  • Resuming a Job
  • Failing Jobs
  • Recovery Strategies

Code

A Simple Job

public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment
		.getExecutionEnvironment();

	env.fromElements(1,2,3,4,5).print();

	env.execute();
}

Running a Job

$ flink run $JAR_FILE
$ flink run -c mypackage.MyClass $JAR_FILE
$ flink run --detached $JAR_FILE

Canceling a Job

$ flink cancel $JOB_ID

Stopping a Job

$ flink stop --savepointPath $SAVEPOINT_FOLDER $JOB_ID

Resuming a Job

$ flink run --fromSavepoint $SAVEPOINT_FILE $JAR_FILE

Setting a Restart Strategy

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
        3, // number of restart attempts
        Time.of(10, TimeUnit.SECONDS) // delay
));

Resources

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. ­čÖé By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

The Flink Job Lifecycle

Hi, I'm Wade from Confluent. In this video, I'm going to introduce you to Flink Jobs written in Java. You'll learn a little bit about how they operate and how to interact with them. The deployable unit in Flink is known as a job. These jobs might be written in Java, Python, or Flink SQL. If they're written in Java, then they're packaged into a JAR file that can be deployed to the Flink cluster. The applications can be quite simple, Job Size consisting of a few basic operations, or they can be complex, containing many operations or even many datastreams. When a job is written in Java, it contains three main components. The first is an entry point for the application The Main Method in the form of a standard Java main method. Usually, this main method is registered as the entry point in the JAR manifest file. However, as we'll see, that's not strictly required. The Execution Environment The second thing our job needs is an execution environment. Depending on whether you use the DataStream API or the Table API, the code for obtaining the environment is slightly different. Here we see the code for getting a StreamExecutionEnvironment for the DataStream API. The Data Stream The final thing our job needs is one or more streams to execute. This is an example of a very simple stream, but as we'll see, they can be far more complex. Once we've written our job, the next step is to run it. The flink run command submits our job to the Flink cluster. It takes a path to the JAR file, looks for the entry point in the manifest, and then executes it. If we haven't specified the entry point in the manifest, then we can provide it at the command line using the -c argument. This allows us to provide the name of a class that will contain the main method. By default, when you run a job, your terminal command won't return. It stays attached to the running job. In this case, using -C doesn't terminate the job. Instead, it just detaches the terminal. The job will continue to run. Running a Job You can also run the job in detached mode using the detached flag. This will cause the terminal command to return immediately, but the job will continue to run on the cluster. Once we run a job, it goes into the created state. This means that the cluster knows about the job, but hasn't yet allocated any resources for running it. The allocation of resources happens when the job is scheduled. This is the responsibility of the job manager. It will allocate task slots or other resources to execute the various stages of the job. If the job has a finite data set, Finishing a Job then it will run until it finishes consuming all of the data. At that point, the job will terminate and enter into a finished state. However, what if the application is operating on an infinite data set, such as a Kafka topic? In that case, the job never really finishes. Instead, if we want to end the job, Canceling a Job we need to explicitly cancel it. We do this using the cancel command and providing the JOB_ID that was returned by the run command. When we issue the cancel command, the job terminates and enters the canceled state. This is considered a non-graceful shutdown and the job cannot be resumed. You can always rerun the job, but it will run as a completely new instance and you may miss or duplicate messages. However, if you want a more graceful shutdown. Stopping a Job you can use the stop command. This requires a savepointPath and the JOB_ID. When Flink receives this command, it will perform a graceful shutdown of the datastreams and save their state into savepoint file. This file contains the necessary information for the job to resume at a later time. The job will then move into a suspended state and will no longer appear in your running jobs in Flink. Resuming a Job When a job is stopped with a savepoint, it can be resumed later. We do this by running the job and providing a path to the appropriate savepoint file. Resuming a job moves it back into the created state where it will need to be scheduled again. However, because you provided a savepoint file, the job will be able to pick up where it left off. It won't miss any messages. Failing Jobs So far, we've been talking about deliberate actions taken by someone managing the Flink cluster. But what happens if a job fails unexpectedly? This depends on how Flink and the job have been configured. The default behavior for a failing job is to move to the failed state. At this point, the job has been terminated and you would need to take manual steps to recover it. Restarting a Job However, Flink also includes the ability to provide a restart strategy. This can be done globally by setting the restart-strategy configuration value. It can be configured with one of three strategies including fixed-delay, failure-rate, or exponential-delay. We can also set the restart strategy in the job itself by calling setRestartStrategy on the execution environment. If a restart strategy has been provided, then the job will be automatically recovered. It will reenter the created state and continue where it left off. The actual job life cycle is more complicated than what we've presented here. We've tried to boil it down to some of the more basic states to make it a little easier to understand. However, if you want a deeper understanding of what's going on under the hood, make sure to check out the Flink documentation. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course and its hands-on exercises.