Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In the previous exercises, we've built our system into a single, linear datastream. We are taking the data from the skyone
topic and passing it all the way through to the final flightdata
topic while performing transformations along the way.
However, our original plan was to work from two different topics, not just one.
In this exercise, we'll be performing a similar conversion on the sunset
topic. This will give us a chance to practice many of the things we've already done. However, it will also give us a chance to learn how to combine multiple streams into one.
Stage the exercise by executing:
./exercise.sh stage 16
To start, we'll need a new data model for our SunsetAirFlightData
.
SunsetAirFlightData
class in the models
package.hashcode
, equals
, toString
.toFlightData
method.Note: Use the SkyOneAirlinesFlightData
class for reference.
Next, we need to duplicate some of the logic we used to create the Sky One stream.
FlightImporterJob.java
.KafkaSource<SkyOneAirlinesFlightData>
and create a KafkaSource<SunsetAirlinesFlightData>
. Read this data from the sunset
topic.DataStream<SkyOneAirlinesFlightData>
and create a DataStream<SunsetAirFlightData>
.Finally, we need to merge our two streams together.
defineWorkflow
method.DataStream<SkyOneAirlinesFlightData>
DataStream<SunsetAirFlightData>
SkyOneAirlinesFlightData
, filter the stream to remove old data and then map the result into a FlightData
object.union
operator to merge the two streams: stream1.union(stream2)
.As always, you can run the tests to verify your application is behaving as expected.
mvn clean test
jar
file:DataGeneratorJob
.FlightImporterJob
.Note: Feel free to use the start-datagen.sh
and start-flightimporter.sh
shell scripts here.
Inspect the messages in Confluent Cloud to verify that everything is working correctly.
flightdata
topic in Confluent Cloud.Messages
tab.You should see messages that look something like this:
{
"emailAddress": "PKPNF@email.com",
"departureTime": "2023-11-02T03:04:00.000Z",
"departureAirportCode": "BNA",
"arrivalTime": "2023-11-02T11:26:00.000Z",
"arrivalAirportCode": "ATL",
"flightNumber": "SUN467",
"confirmationCode": "SUNDOYDFB"
}
Note: The flightNumber
and confirmationCode
fields will include the SUN
prefix if the records are coming from the sunset
topic.
If you are looking for an additional challenge, you can attempt to re-implement defineWorkflow
using a combination of connect
and a CoMapFunction
.
connect
to link the DataStream<SkyOneAirlinesFlightData>
with the DataStream<SunsetAirFlightData>
.map
along with a CoMapFunction
to convert each type to a FlightData
object.This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.