Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Deserializing Messages in Flink (Hands-On)

Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.

In this exercise, we'll take the records we consumed in the previous exercise, and deserialize them into a Plain Old Java Object (POJO).

Our Sky One Airlines flight data is being sent through Kafka in a JSON format. Working with this data in its raw form in Java will be awkward. So instead, we want to convert it into a Java object that will be more convenient.

10-deserialization-system-diagram

Thankfully, Flink has built-in support for doing these conversions which makes our job relatively simple.

If you get stuck on this exercise, you can watch me write the code in the following video:

Consume Apache Kafka Messages using Apache Flink and Java

Stage the Exercise

Stage the exercise by executing:

./exercise.sh stage 10

Create the Data Model

Our first step is to define the class that will act as our data model.

  1. Create a new Java package named models.

  2. Create a new class in the models package named SkyOneAirlinesFlightData.

  3. Include the following imports:

    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    
    import java.time.ZonedDateTime;
    import java.util.Objects;
  4. Annotate the class with:

    @JsonIgnoreProperties(ignoreUnknown = true)`
  5. Use the following class diagram to help you design the model.

Sky One Airlines Flight Data

  1. Your class should include the following:

    • An empty constructor.
    • Appropriate getters/setters for each of the fields.
    • All dates should use the format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX.
    • Overrides for equals and hashCode.
    • An override for toString.

Hint: You can use the model defined in the datagen package as a basis for this, however, you will need to make some changes. You can also use code generators if you like.

Hint: When you staged the exercise at the beginning, new unit tests were created to help test your model (see SkyOneAirlinesFlightDataTest.java). You can use them to verify that you have created the model correctly.

mvn clean test

Deserialize the Model

Next, we need to change our Data Stream to work with our new data model.

  1. Open FlightImporterJob.java.
  2. Import models.SkyOneAirlinesFlightData and org.apache.flink.formats.json.JsonDeserializationSchema
  3. Modify the type of the KafkaSource and the DataStream to provide a SkyOneAirlinesFlightData object instead of a String.
  4. Change setValueOnlyDeserializer to a new instance of JsonDeserializationSchema and pass it a reference to the SkyOneAirlinesFlightData class.

Execute the Jobs

  1. Build and package the jar file:
  2. Execute the DataGeneratorJob.
  3. Execute the FlightImporterJob.

Note: Feel free to use the start-datagen.sh and start-flightimporter.sh shell scripts here.

Inspect the Logs

Check the logs to make sure the data is being printed as expected.

  1. Navigate to localhost:8081 in a browser.
  2. Select Job Manager from the left.
  3. Select Log List from the top.
  4. Sort the Log List by Last Modified Time in Descending order (newest first).
  5. Click on the first .out file.

You should continue to see your data being exported, however, it should now match the format that you created for your toString method, rather than the original JSON format.

For example, it might look something like:

SkyOneAirlinesFlightData{emailAddress='BQPWJ@email.com', flightDepartureTime=2024-01-15T09:12Z[UTC], iataDepartureCode='IAH', flightArrivalTime=2024-01-15T19:16Z[UTC], iataArrivalCode='DFW', flightNumber='SKY1900', confirmation='SKY1TESBKY'}

Remember to cancel your jobs so they don't use your Confluent Cloud credits.

Finish

This brings us to the end of this exercise.

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.