Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Merging Flink Data Streams (Exercise)

Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.

In the previous exercises, we've built our system into a single, linear datastream. We are taking the data from the skyone topic and passing it all the way through to the final flightdata topic while performing transformations along the way.

However, our original plan was to work from two different topics, not just one.

In this exercise, we'll be performing a similar conversion on the sunset topic. This will give us a chance to practice many of the things we've already done. However, it will also give us a chance to learn how to combine multiple streams into one.

system-diagram

Stage the Exercise

Stage the exercise by executing:

./exercise.sh stage 16

Create the SunsetAirFlightData model

To start, we'll need a new data model for our SunsetAirFlightData.

SunsetAirFlightData

  1. Using the above image, create a new SunsetAirFlightData class in the models package.
  2. Make sure to override hashcode, equals, toString.
  3. Include a toFlightData method.

Note: Use the SkyOneAirlinesFlightData class for reference.

Create the SunsetAir Stream

Next, we need to duplicate some of the logic we used to create the Sky One stream.

  1. Open the FlightImporterJob.java.
  2. Copy the logic used to create the KafkaSource<SkyOneAirlinesFlightData> and create a KafkaSource<SunsetAirlinesFlightData>. Read this data from the sunset topic.
  3. Copy the logic used to create a DataStream<SkyOneAirlinesFlightData> and create a DataStream<SunsetAirFlightData>.

Merge the Streams

Finally, we need to merge our two streams together.

  1. Modify the signature of the defineWorkflow method.
  2. It should now take two parameters:
    • DataStream<SkyOneAirlinesFlightData>
    • DataStream<SunsetAirFlightData>
  3. Just like you did with the SkyOneAirlinesFlightData, filter the stream to remove old data and then map the result into a FlightData object.
  4. Use the union operator to merge the two streams: stream1.union(stream2).
  5. Return the result.

Run All Tests

As always, you can run the tests to verify your application is behaving as expected.

mvn clean test

Execute the Jobs

  1. Build and package the jar file:
  2. Execute the DataGeneratorJob.
  3. Execute the FlightImporterJob.

Note: Feel free to use the start-datagen.sh and start-flightimporter.sh shell scripts here.

Inspect the Messages

Inspect the messages in Confluent Cloud to verify that everything is working correctly.

  1. Navigate to the flightdata topic in Confluent Cloud.
  2. Select the Messages tab.
  3. Watch to see that you are receiving Sunset Air messages from your Flink application.

You should see messages that look something like this:

{
  "emailAddress": "PKPNF@email.com",
  "departureTime": "2023-11-02T03:04:00.000Z",
  "departureAirportCode": "BNA",
  "arrivalTime": "2023-11-02T11:26:00.000Z",
  "arrivalAirportCode": "ATL",
  "flightNumber": "SUN467",
  "confirmationCode": "SUNDOYDFB"
}

Note: The flightNumber and confirmationCode fields will include the SUN prefix if the records are coming from the sunset topic.

[Optional] Re-implement using connect and a CoMapFunction

If you are looking for an additional challenge, you can attempt to re-implement defineWorkflow using a combination of connect and a CoMapFunction.

  1. Use connect to link the DataStream<SkyOneAirlinesFlightData> with the DataStream<SunsetAirFlightData>.
  2. Use map along with a CoMapFunction to convert each type to a FlightData object.
  3. Don't forget to filter out old data.

Finish

This brings us to the end of this exercise.

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.