Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In this exercise, we'll take the SkyOneAirlinesFlightData
records we created previously and perform some basic transformations and filtering.
Currently, our application only takes records from Sky One Airlines. However, we intend to add more airlines in the future. Each of these airlines might use a slightly different data format for their records.
We will be converting from our airline-specific SkyOneAirlinesFlightData
into a more unified FlightData
record.
In addition, the data feeds we get are sometimes delayed which can result in stale data. We will filter out the stale data so it doesn't trigger unnecessary downstream processing.
Stage the exercise by executing:
./exercise.sh stage 12
Our first step is to define the FlightData
class which will represent a more unified view of all of the airlines.
models
package, create a new class named FlightData
.Your class should include the following:
yyyy-MM-dd'T'HH:mm:ss.SSSXXX
.equals
and hashCode
.toString
.Hint: This class will be very similar to your SkyOneAirlinesFlightData
. That class would make a good starting point for your implementation.
Next, we will add a small helper method to our SkyOneAirlinesFlightData
that will convert it to a FlightData
object.
SkyOneAirlinesFlightData
class.toFlightData
that returns an instance of FlightData
.FlightData
and populate each of the fields.Now, we can put these new pieces to work inside the FlightImporterJob
.
Open the FlightImporterJob
.
Create a new public
static
method named defineWorkflow
.
It should take a single parameter named skyOneSource
of type DataStream<SkyOneAirlinesFlightData
.
It should return a DataStream<FlightData>
.
Implement the method as follows:
.filter
on the skyOneSource
, only keep entries where the flight arrival time is after the current time (ZonedDateTime.now()
)..map
on the skyOneSource
and the toFlightData
method, convert each element in the stream to a FlightData
instance.Modify the main method as follows:
print
on the skyOne data stream, pass that stream to the defineWorkflow
method and call print
on the result.Note: We separated the filtering and transformation logic into the defineWorkflow
method which allows us to write tests against that logic (see FlightImporterJobTest
.
As with the previous exercise, new tests were added when you staged the exercise. Run these tests to verify your code.
mvn clean test
jar
file:DataGeneratorJob
.FlightImporterJob
.Note: Feel free to use the start-datagen.sh
and start-flightimporter.sh
shell scripts here.
Check the logs to make sure the data is being printed as expected.
localhost:8081
in a browser.Job Manager
from the left.Log List
from the top.Log List
by Last Modified Time
in Descending
order (newest first)..out
file.You should continue to see your data being exported, however, it should now match the FlightData
format instead of the SkyOneAirlinesFlightData
format.
For example, it might look something like this:
FlightData{emailAddress='BQPWJ@email.com', departureTime=2023-12-04T17:48Z[UTC], departureAirportCode='ATL', arrivalTime=2023-12-05T05:27Z[UTC], arrivalAirportCode='SAN', flightNumber='SKY1900', confirmationCode='SKY1TESBKY'}
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.