Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In this exercise, we are going to start implementing a new Flink Job. This new job will consume the data from our flightdata topic and use it to aggregate some statistics about individual users.
Once we have collected, and calculated the necessary statistics, we will push them out onto a new topic named userstatistics.
The basic flow of this new job is illustrated below:
The UserStatistics class will consist of three fields including emailAddress, totalFlightDuration, and numberOfFlights.
Essentially, as each flight record passes through our system, we'll accumulate data for each individual user to determine the totalFlightDuration and the numberOfFlights.
We've taken the liberty of implementing some of this for you. Most of the boilerplate for the UserStatistics and UserStatisticsJob classes has been created. Both will be made available when you stage the exercise.
Stage the exercise by executing:
./exercise.sh stage 18
Our first step is to create a new Kafka topic to hold our User Statistics.
Next, we are going to add a merge function to our UserStatistics model. The purpose of this function is to take two different UserStatistics objects and merge them into one. This merge will add the flight durations and the number of flights together to produce a single total for each.
Next, we will implement the defineWorkflow function to create our stream.
The layout of the stream we are creating looks something like this:
We'll take the incoming FlightData, convert it to UserStatistics, use keyBy to partition it into streams based on the email address, accumulate records in a one-minute tumbling window, use reduce to aggregate those statistics, and finally send them back to Kafka.
This seems fairly complicated. But as you will see, Flink makes it easy.
As always, you can run the tests to verify your application is behaving as expected.
mvn clean test
We'll be executing an extra job this time. You may need to open an additional terminal.
Note: Feel free to use the start-datagen.sh, start-flightimporter.sh, and start-userstatistics.sh shell scripts here.
Inspect the messages in Confluent Cloud.
Note: Because messages are windowed over a 1 minute period, we wouldn't expect to see results until about one minute after starting the application. However, even if you wait longer than this you still won't see any messages. What happened?
The problem we've encountered is that Windowing functions require watermarks in order to operate properly. Essentially, the watermarks are used to measure when a window starts and ends. However, we haven't enabled any watermarks in our stream. As a result, the stream can't produce any new windows. So let's go ahead and fix that.
We've now said that our events will contain timestamps that continuously increment. These timestamps are automatically defined inside the Kafka events. This should be enough to get our stream working.
Now, take another look at the messages in Confluent Cloud.
After about a minute you should start to see messages that look something like this:
{
"emailAddress": "HFRQC@email.com",
"totalFlightDuration": 21000,
"numberOfFlights": 1
}
Note: You should see a few records that have the numberOfFlights higher than 1. However, it's unlikely you will see numbers much higher than 2 or 3. This is because the likelihood of two records having the same email address within the same minute is quite low, and we reset the total with each new window. But what if we didn't want to reset the total for each new window? What if we wanted to accumulate the total across windows? That's what we will learn next.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.