Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In the previous exercise, we set up a Flink Job to accumulate statistics in a one-minute window. However, each window was independent. The totals did not carry over. What if we wanted a cumulative total reported every minute? How would we achieve that?
In this exercise, we'll introduce stateful operations that will allow us to carry information from one window to the next.
Stage the exercise by executing:
./exercise.sh stage 20
Our first step is to define a function that will manage the state across windows. This function will load the previous state, combine it with the state of the current window, and then store the result. This allows us to create a cumulative result.
Create a new class in the userstatistics package named ProcessUserStatisticsFunction.
Import the following:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import models.UserStatistics;
Extend ProcessWindowFunction<UserStatistics, UserStatistics, String, TimeWindow>
Define a private field of type ValueStateDescriptor<UserStatistics> named stateDescriptor. This provides an identifier (descriptor) for our state storage mechanism.
Our ProcessUserStatisticsFunction will require two methods. The first is the open method. It can be used for any one-time setup we need. We'll use it to initialize the descriptor.
The next method we need is the process method. It will load our previous state, combine it with the latest window, and save and emit the result.
Override the process method. It should take the following parameters and return void:
Implement the function as follows:
Note: If you look at the logic for what we've just implemented, it's basically the same as the reduce that we implemented previously. I.E. it's taking the previous state, merging it with the new state, and emitting the result. The difference here is that the state we combine it with is being persisted across windows. We'll discuss this in a little more detail shortly.
Now that we've defined our function, let's put it to work.
Open the UserStatisticsJob.
Locate the call to reduce.
reduce takes two parameters.
At this point, you might ask why we don't just eliminate the reduce completely and call process with our ProcessUserStatisticsFunction. It is essentially doing the same thing. And in fact, if you did that, all of your tests would pass and you'd get the right behavior.
However, using process in this case isn't optimized. It accumulates all of the results for a window into a single Iterable. Depending on the size of the window, and the size of the data, that could take a lot of memory.
On the other hand, the reduce function can continuously process the messages in the window. Each time we receive a new message we can call reduce and only keep track of the result. This means that we only ever need to store at most two UserStatistics objects and by the time we call the ProcessUserStatistics function, we'll have already reduced the messages in the window to a single record. As such, when we call the function, even though it operates on an array, that array will only contain a single element. This is much more memory efficient.
However, always consider your use case. If the size of the data and the size of the window are small, the extra efficiency may not be worth the increase in code complexity.
As always, you can run the tests to verify your application is behaving as expected.
mvn clean test
Execute each of the jobs.
Note: Feel free to use the start-datagen.sh, start-flightimporter.sh, and start-userstatistics.sh shell scripts here.
Inspect the messages in Confluent Cloud.
As before you should begin to see messages appear after about a 1 minute period. The difference is that the totals in these messages should grow over time. So the first minute, you might have low numbers, but as each successive window passes, you should start to see those numbers grow.
Note: You can enter a single email address into the filter box. This will allow you to watch that email address and see how it grows over time. Just pick any of the email addresses you can see. However, be aware that it may not display the results in order. You'll need to compare timestamps.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.