Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
Now that we have our data transformed into the desired format, in this exercise we'll push that data to another Kafka topic through a Sink.
Our data has been transformed from the Sky One Airlines format into our own internal FlightData
format. At the same time, we filtered out any stale records. However, those records are currently just being dumped into a log file where they aren't very useful.
To resolve this, we'll be creating a new flightdata
topic in Kafka and pushing our records to it. This will require us to create a KafkaSink
as well as a Serializer
for the records.
Stage the exercise by executing:
./exercise.sh stage 14
We'll be pushing our data to the flightdata
topic, so let's go ahead and create that.
flightdata
.Our KafkaSink
will require access to a producer configuration. We already created one in a previous exercise so we can re-use it as is.
Open the FlightImporterJob
class.
Add the following imports:
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
After loading the consumer.properties
also load the producer.properties
.
Hint: You can load the configuration the same way you did with the consumer.
Note: In a production application, we wouldn't share our configuration files between applications. Each application would have its own configuration. This would allow each application to have different values for settings such as group.id
and client.id
. If you want a more realistic experience, consider creating separate producer.properties
and consumer.properties
files for each job.
Next, we need a way to configure a serializer for our data.
FlightImporter
, before calling defineWorkflow
use the KafkaRecordSerializationSchema.<T>builder()
method to create a new builder of type <FlightData>
.setTopic
method on the builder and provide the name of the topic you created above.setValueSerializationSchema
and pass a new JsonSerializationSchema<FlightData>
.
ObjectMapper
.
() -> {
return new ObjectMapper();
}
registerModule
on the ObjectMapper
and provide it with a new JavaTimeModule()
.build
method on the builder to create a KafkaRecordSerializationSchema<FlightData>
and store the result in a variable.In addition to the serialization schema, we also need a KafkaSink
. This is the endpoint we will use to push our data to Kafka.
defineWorkflow
, use the KafkaSink.<T>builder()
to create a new builder of type FlightData
.setKafkaProducerConfig
on the builder and pass it the producer properties we loaded above.setRecordSerializer
on the builder and pass the serializer we created above.build
method on the builder to create a `KafkaSinkFinally, we need to connect our Sink
to our Stream.
.print
on the result of defineWorkflow
, call sinkTo
and pass the KafkaSink
created above.sinkTo
, call name
to give this portion of the stream a name such as "flightdata_sink".No new tests were brought in for this exercise. However, it's a good practice to run the tests anyway just to verify we haven't broken anything.
mvn clean test
jar
file:DataGeneratorJob
.FlightImporterJob
.Note: Feel free to use the start-datagen.sh
and start-flightimporter.sh
shell scripts here.
Inspect the messages in Confluent Cloud to verify that everything is working correctly.
flightdata
topic in Confluent Cloud.Messages
tab.You should see messages that look something like this:
{
"emailAddress": "PKPNF@email.com",
"departureTime": "2023-11-02T03:04:00.000Z",
"departureAirportCode": "BNA",
"arrivalTime": "2023-11-02T11:26:00.000Z",
"arrivalAirportCode": "ATL",
"flightNumber": "SKY1467",
"confirmationCode": "SKY1DOYDFB"
}
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.