Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In this exercise, we'll take the records we consumed in the previous exercise, and deserialize them into a Plain Old Java Object (POJO).
Our Sky One Airlines flight data is being sent through Kafka in a JSON format. Working with this data in its raw form in Java will be awkward. So instead, we want to convert it into a Java object that will be more convenient.
Thankfully, Flink has built-in support for doing these conversions which makes our job relatively simple.
Stage the exercise by executing:
./exercise.sh stage 10
Our first step is to define the class that will act as our data model.
Create a new Java package named models
.
Create a new class in the models
package named SkyOneAirlinesFlightData
.
Include the following imports:
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.time.ZonedDateTime;
import java.util.Objects;
Annotate the class with:
@JsonIgnoreProperties(ignoreUnknown = true)`
Use the following class diagram to help you design the model.
Your class should include the following:
yyyy-MM-dd'T'HH:mm:ss.SSSXXX
.equals
and hashCode
.toString
.Hint: You can use the model defined in the datagen
package as a basis for this, however, you will need to make some changes. You can also use code generators if you like.
Hint: When you staged the exercise at the beginning, new unit tests were created to help test your model (see SkyOneAirlinesFlightDataTest.java
). You can use them to verify that you have created the model correctly.
mvn clean test
Next, we need to change our Data Stream to work with our new data model.
FlightImporterJob.java
.models.SkyOneAirlinesFlightData
and org.apache.flink.formats.json.JsonDeserializationSchema
KafkaSource
and the DataStream
to provide a SkyOneAirlinesFlightData
object instead of a String
.setValueOnlyDeserializer
to a new instance of JsonDeserializationSchema
and pass it a reference to the SkyOneAirlinesFlightData
class.jar
file:DataGeneratorJob
.FlightImporterJob
.Note: Feel free to use the start-datagen.sh
and start-flightimporter.sh
shell scripts here.
Check the logs to make sure the data is being printed as expected.
localhost:8081
in a browser.Job Manager
from the left.Log List
from the top.Log List
by Last Modified Time
in Descending
order (newest first)..out
file.You should continue to see your data being exported, however, it should now match the format that you created for your toString
method, rather than the original JSON format.
For example, it might look something like:
SkyOneAirlinesFlightData{emailAddress='BQPWJ@email.com', flightDepartureTime=2024-01-15T09:12Z[UTC], iataDepartureCode='IAH', flightArrivalTime=2024-01-15T19:16Z[UTC], iataArrivalCode='DFW', flightNumber='SKY1900', confirmation='SKY1TESBKY'}
Remember to cancel your jobs so they don't use your Confluent Cloud credits.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.