Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Flink Data Transformations (Hands-On)

Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.

In this exercise, we'll take the SkyOneAirlinesFlightData records we created previously and perform some basic transformations and filtering.

Currently, our application only takes records from Sky One Airlines. However, we intend to add more airlines in the future. Each of these airlines might use a slightly different data format for their records.

We will be converting from our airline-specific SkyOneAirlinesFlightData into a more unified FlightData record.

12-transformations-system-diagram

In addition, the data feeds we get are sometimes delayed which can result in stale data. We will filter out the stale data so it doesn't trigger unnecessary downstream processing.

Stage the Exercise

Stage the exercise by executing:

./exercise.sh stage 12

Create the Flight Data Model

Our first step is to define the FlightData class which will represent a more unified view of all of the airlines.

  1. In the models package, create a new class named FlightData.
  2. Use the following class diagram to help you design the model.

Flight Data

  1. Your class should include the following:

    • An empty constructor.
    • Appropriate getters/setters for each of the fields.
    • All dates should use the format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX.
    • Overrides for equals and hashCode.
    • An override for toString.

Hint: This class will be very similar to your SkyOneAirlinesFlightData. That class would make a good starting point for your implementation.

Add a Conversion Method

Next, we will add a small helper method to our SkyOneAirlinesFlightData that will convert it to a FlightData object.

Sky One to Flight Data

  1. Open the SkyOneAirlinesFlightData class.
  2. Add a method named toFlightData that returns an instance of FlightData.
  3. Implement the method to create a new instance of FlightData and populate each of the fields.

Update the Flight Importer Job

Now, we can put these new pieces to work inside the FlightImporterJob.

  1. Open the FlightImporterJob.

  2. Create a new public static method named defineWorkflow.

  3. It should take a single parameter named skyOneSource of type DataStream<SkyOneAirlinesFlightData.

  4. It should return a DataStream<FlightData>.

    • Hint: You'll need an import.
  5. Implement the method as follows:

    • Using .filter on the skyOneSource, only keep entries where the flight arrival time is after the current time (ZonedDateTime.now()).
    • Using .map on the skyOneSource and the toFlightData method, convert each element in the stream to a FlightData instance.
    • Return the resulting stream.
  6. Modify the main method as follows:

    • Instead of calling print on the skyOne data stream, pass that stream to the defineWorkflow method and call print on the result.

Note: We separated the filtering and transformation logic into the defineWorkflow method which allows us to write tests against that logic (see FlightImporterJobTest.

Run All Tests

As with the previous exercise, new tests were added when you staged the exercise. Run these tests to verify your code.

mvn clean test

Execute the Jobs

  1. Build and package the jar file:
  2. Execute the DataGeneratorJob.
  3. Execute the FlightImporterJob.

Note: Feel free to use the start-datagen.sh and start-flightimporter.sh shell scripts here.

Inspect the Logs

Check the logs to make sure the data is being printed as expected.

  1. Navigate to localhost:8081 in a browser.
  2. Select Job Manager from the left.
  3. Select Log List from the top.
  4. Sort the Log List by Last Modified Time in Descending order (newest first).
  5. Click on the first .out file.

You should continue to see your data being exported, however, it should now match the FlightData format instead of the SkyOneAirlinesFlightData format.

For example, it might look something like this:

FlightData{emailAddress='BQPWJ@email.com', departureTime=2023-12-04T17:48Z[UTC], departureAirportCode='ATL', arrivalTime=2023-12-05T05:27Z[UTC], arrivalAirportCode='SAN', flightNumber='SKY1900', confirmationCode='SKY1TESBKY'}

Finish

This brings us to the end of this exercise.

Use the promo codes FLINKJAVA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.