Get Started Free

Test,How To Manage Stateful Streams with Apache Flink and Java

Managing state efficiently in a distributed streaming pipeline can be difficult. Thankfully, Apache Flink® has you covered. In this video, Wade Waldron will demonstrate how to use Flink's Keyed State feature to manage the state in a distributed cluster.

10 min

How To Manage Stateful Streams with Apache Flink and Java

10 min
Wade Waldron

Wade Waldron

Principal Software Practice Lead

Managing state efficiently in a distributed streaming pipeline can be difficult. Thankfully, Apache Flink® has you covered. In this video, Wade Waldron will demonstrate how to use Flink's Keyed State feature to manage the state in a distributed cluster.

Take the Building Apache Flink® Applications in Java course now

Resources

How To Manage Stateful Streams with Apache Flink and Java

Intro

Hi, I'm Wade from Confluent. One challenge when working with distributed systems is reducing the network traffic. Each network hop consumes important resources, which can reduce the performance and increase costs. Flink uses keyed state to guarantee data related to a specific key is kept local to the node doing the processing, thereby reducing network hops. This allows Flink to perform internal optimization, such as in-memory caching that might otherwise be difficult. I want to show an example of how to build a Flink job that leverages keyed state. The job I'm building will collect flight data for individual users and perform aggregations on that data. Specifically, I will calculate details like how many hours a user has spent on flights and how many flights they might have taken. These results will be cumulative and reported every minute, mainly because reporting every hour would make for a really long demo. So let's take a look at some code.

Data Classes

The input to the job is a flight data class with various fields for key details about the flight. You can see the structure of the class here. I won't bother showing the code, but if you're interested, check out the Building Flink Applications with Java on Confluent Developer, where you'll get a chance to build this yourself. The output for the job is the user statistics class. It includes an email address, total flight duration and number of flights. Again, you can see the structure of the class here, but in this case I do want to look at the merge function. It takes an incoming user statistics object and merges it with the current object I have an assertion to make sure that email addresses are the same so that I don't accidentally merge unrelated records. I add together the flight duration and number of flights and drop them into a new object. Then I return the result. Now let's outline a couple more classes.

The Kafka Source

The flight data source class includes a create function that will return our Flink data source, in this case a Kafka topic. There's a link below to a video where I go through this code in more detail.

The Kafka Sink

The user statistics sink includes a similar create function to return a Kafka sink and once again check below for a link to a video showing this code in detail. Okay, that's enough of the basics. I'm going to create a class called Process User Statistics Function. Its job is to roll up the one minute time windows into a cumulative report. It will extend Process Window Function. It takes the User Statistics as both an input and an output. You might be wondering why it isn't using Flight Data as an input. I'll cover that in a bit, but basically I perform that conversion earlier in the pipeline. I also need a string for the key and a time window since this is a windowed function. Next, I create a descriptor.

Flink State Descriptors

It provides details about the state. I'm using Value State. But there are other options such as Map State. The descriptor is initialized in the open method. I pass it in name, which is an identifier used by Flink to help locate the data. I also provide details on how to serialize the state. I just need a reference to the class because I'm using Flink's built in serialization. Next time implementing the process function. It includes a parameter I named Email Address. This key, along with the descriptor is used to locate the state. At least, kind of. I'll come back to that in a moment. The context is an object that contains a variety of information, including the state store. The iterable of user statistics is the input to the function, and finally, the collector provides the output.

Retrieving Flink State

Inside the function, I use the context to access the global state and then call get state with the descriptor. I also need to pull the user statistics out of the state. This will represent the data that has been accumulated prior to the current window, so I've named it Accumulated Stats. Notice that when accessing the state I haven't actually provided the key. Flink provides it implicitly so that when I pull from the global state, it has already been assigned. The window will include a set of records provided in the iterable.

Updating Flink State

I'll loop through them and merge them with the Accumulated Stats. If this is the first time I've encountered the email address, the Accumulated Stats would be null. In that case, I use the New Stats. Otherwise I merged the Accumulated Stats with the New Stats. Eventually I'll work through all of the items in the list and have a new value. I call the update function on the state to register the new value. Then I call collector dot collect to push the value into the output of the function. Now I can go put this to use. I'm creating a class named User Statistics Job. It's the entry point for my applications, so it contains a relatively simple Java main method. I start with a Stream Execution Environment acting as the engine for my streaming job. I make use of the Flight Data Source to create my source and the User Statistics Sink to create my sink.

Building the Stream

Then I wire everything together. I'll be providing names and UIDs at critical portions of my stream to help identify those sections. Now, using the Flight Data Stream, I call the map function and pass it the constructor for the User Statistics Object. This converts each of my Flight Data Objects into a User Statistics Object. The flight data is larger than the user statistics. By converting early in the process, I reduce the size of the data being passed around. This is important because in the next step the objects are serialized and sent over the network. Next, I use Key By to partition the stream using the email address. Flink uses the key to redistribute the stream across the cluster. Did I mention that Flink operates as a cluster? By the way, Flink operates as a cluster. This allows for parallel processing of the data at the cost of network hops. With the records partitioned, I divide the data into one minute tumbling windows. Now I get to put my processing function to use. I call the reduce method and pass it the user statistics merge function. Every time the stream encounters a new record, it will merge it with the previous one. Then I give the reduced function an instance of the Process User Statistics Function. Here are things get a little complicated. Why does reduce need both the merge and the process function? The reduced function operates on a window, but it isn't really stateful. Each time we call the function, it merges the current and previous results. At the end of the window it throws everything away. That means it won't accumulate data over multiple windows, which is why I need the Process User Statistics Function. But couldn't I call process and give it the Process Users Statistics Function? Actually, that would work. However, there's an efficiency issue. When the function is called, all of the records for the window are loaded into the iterable. Depending on the size of the window and the number of records. This could create memory issues. To avoid this, I use the reduce function to accumulate statistics within a window. This only requires two records at a time. Then I use the Process User Statistics Function to combine windows in a stateful fashion Because the records have already been reduced, the iterable only contains a single record. This is a more efficient implementation, even if it looks a little odd. Now there are a few more things I need to do. I'm going to send the results to the sink and finally I execute the job. Let's try it out.

Compiling and Running

The first step is to compile my job with Maven and through the magic of television, it's done. Now let's execute it with Flink. I can take a quick look in the Flink dashboard to see that the job is running.

Verifying it Works

But there won't be a lot of detail here. If I really want to see what it's doing, I need to jump into Confluent Cloud, which is where my Kafka topics live. I can open up the user statistics topic and then navigate to the messages tab. It's going to take some time because I only omit new results every minute, but if I wait long enough, I should see some results. Now I can't guarantee everything is working because I'm only looking at 1 minutes worth of data. I need to watch it for a longer period of time and look for a specific email address. I want to see the total time and number of flights continue to grow. That was a lot of code for a short period of time.

Next Steps

I hope you're able to follow everything. If you want a deeper look at anything, check out the course Building Apache Flink Applications with Java on Confluent Developer. In that course, I'll walk you through a lot more of the theory behind the code, as well as give you more time to absorb it and even better, you'll get a chance to write all of the code yourself. I'd be interested to hear what you think of these coding style videos. Would you like to see more of them? Drop a comment below to let me know. Don't forget to like, share and subscribe for more awesome content. And thanks for watching.