Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In this exercise, we'll run our first Flink job.
We are working in a domain that handles airline flight data. Imagine it as a travel itinerary that imports flight data from various sources and creates new views of that data. In particular, we'll be working with the fictional airlines "Sunset Air" and "Sky One Airlines".
This is a rough outline of the system we are building:
The DataGeneratorJob has already been created. It will be used to generate fake flight data for each of these airlines. It will then push the data to Kafka topics. This is where we will focus this exercise.
In later exercises, we'll implement the other parts of the pipeline where we consume and transform the feeds.
The code repository you set up in the first exercise includes the following folders:
The exercises folder contains a utility script:
To start with, we want to stage the current exercise. This will load any code required for the exercise. Run the following command:
cd exercises
./exercise.sh stage 05
If at any point, you wish to automatically solve the exercise, you can execute:
./exercise.sh solve <exercise-id>
You can also copy individual solution files by executing:
./exercise.sh solve <exercise-id> <filename>
Note: Exercises are numbered with their absolute position in the course (including lecture videos). That's why this exercise is numbered 05.
Each of our airlines will be pushing data to a separate topic. Let's create those topics.
You can create these topics from the command line using the following:
confluent kafka topic create skyone --cluster <ClusterID> --environment <EnvironmentID>
confluent kafka topic create sunset --cluster <ClusterID> --environment <EnvironmentID>
Our data generator is going to need access to our Kafka cluster so we will need to add some configuration.
In Confluent Cloud, navigate to your Kafka cluster and then select Connectors on the left.
Using the search box, look for Java, then click Connect with Java.
You will be presented with a set of Java configuration values. Copy the configuration.
In exercises/src/main/resources create a new file named producer.properties.
Paste the Java configuration into the file.
Replace the values of {{ CLUSTER_API_KEY }} and {{ CLUSTER_API_SECRET }} with the API key and secret that you created in the previous exercise.
Delete the following configuration entries (we won't be using them):
Save the file.
With the configuration in place, we are ready to run the job.
Note: Before running the job, make sure you have a Flink instance running as outlined in the previous exercise.
From inside the exercises folder, build and package the job:
mvn clean package
Run the job:
<flink_installation_path>/bin/flink run target/travel-itinerary-0.1.jar
or
../flink*/bin/flink run target/travel-itinerary-0.1.jar
Locate the skyone and sunset topics in Confluent Cloud. Click on the Messages tab for each topic. If everything is working, you should see each of the messages being produced.
Note: for convenience, we have provided a small shell script named start-datagen.sh. The script will package the job and run it. Feel free to use the script in future exercises.
Next, we'll take a look at the running job in the Flink UI.
Now let's see how to cancel the job.
Note: When you run the job from your terminal, it should run in the foreground. However,
Through the Flink UI this is easy. Simply select the job and click the Cancel Job link in the top right corner.
Through the command line you will need the JobID. This is provided when you run the job. Alternatively, you can see all running jobs by executing:
<flink_installation_path>/bin/flink list
or
../flink*/bin/flink list
Once you have the JobID, you can cancel it with:
<flink_installation_path>/bin/flink cancel <JobID>
or
../flink*/bin/flink cancel <JobID>
Go ahead and cancel the job using whichever option you prefer.
Note: When you cancel the job you may see a JobCancellationException. This is normal and can be ignored.
Verify the job is canceled either through the UI or at the command line (flink list).
Note: Although this Flink job is fairly lightweight, you will want to make sure it is not running if you are not using it, otherwise, it can consume your Confluent Cloud credits.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.