Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In this exercise, we'll start to create a new Flink data stream.
We will be building a new Flink Job known as the
FlightImporterJob. This job will eventually be responsible for importing flight data from our
sunset Kafka topics and converting it to a unified format.
However, for the moment, we'll keep it simple. We'll just be reading data from the
skyone topic and printing it to a log file. We'll expand on this later.
If you get stuck on this exercise, you can watch me write the code in the following video:
Stage the exercise by executing:
./exercise.sh stage 08
The first step is to create a consumer configuration to read from Kafka. The simplest way to do this is to copy from the
Next, let's create the job that will consume our
Create a new java package in
Add a new class to the
flightimporter package named
main method in the class to create an entry point for the job.
Import the following:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.InputStream; import java.util.Properties;
Our data stream is going to require an execution environment as well as access to our consumer configuration.
mainmethod we defined above, create a
We will be reading our data from the
skyone Kafka topic. This requires us to create a
KafkaSource.builderto create a new
setPropertiesand pass it the consumer configuration.
setTopicsand pass it the name of the topic it will consume.
setStartingOffsetsand pass it
OffsetsInitializer.latest()so that it starts consuming from the end of the topic.
setValueOnlyDeserializerand give it a new
buildto create the
We'll be using our
KafkaSource to create a new
fromSource on the
StreamExecutionEnvironment to create a new
DataStream<String> with the following parameters:
For now, we are just going to print the data from our stream to a log file.
StreamExecutionEnvironmentand pass it a name for the job (eg. "FlightImporter").
If you look inside the
pom.xml file for the project, you will find a line like the following:
This indicates that the
DataGeneratorJob is the main entry point for the application. However, we have now created a second possible entry point (The
This means that if we want to run our
DataGeneratorJob we'll need to be more specific. We can use the
-c flag on the
flink run command to specify our entry point.
If it's not already running, make sure to start your Flink cluster.
Build and package the
mvn clean package
<flink_installation_path>/bin/flink run target/travel-itinerary-0.1.jar
../flink*/bin/flink run target/travel-itinerary-0.1.jar
<flink_installation_path>/bin/flink run -c flightimporter.FlightImporterJob target/travel-itinerary-0.1.jar
../flink*/bin/flink run -c flightimporter.FlightImporterJob target/travel-itinerary-0.1.jar
Note: During this process you may encounter errors such as:
WARNING: An illegal reflective access operation has occurred
These are common when working with Flink and Java 11 and you can feel free to ignore them.
Note: We have provided the
start-flightimporter.sh shell scripts to make this a little easier.
Once the jobs are both running, check the logs to make sure the data is being printed as expected. Finding the right log file can be a little tricky.
localhost:8081 in a browser.
Job Manager from the left.
Log List from the top.
Log List by
Last Modified Time in
Descending order (newest first).
Click on the first
Note: It make take a moment for the file to show up. Try reloading the page if you don't see it.
This represents the most recently modified output file from our jobs. Assuming that we are printing logs on every message, the most recently modified output file should contain the logs we are looking for. You should see a series of JSON entries similar to:
Remember to cancel your jobs so they don't use your Confluent Cloud credits.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.