Get Started Free
‹ Back to courses
course: Apache Flink® 101

Stateful Stream Processing with Flink SQL

5 min
David Anderson

David Anderson

Software Practice Lead

Stateful Stream Processing with Flink SQL

Overview

Flink SQL is organized around providing alternatives to queries where the runtime might have to maintain an ever-growing amount of state in order to provide correct results. This video explains which queries are potentially problematic, and why.

Topics:

  • Stateless Queries
  • Materializing Queries
  • Temporal Queries

Resources

Use the promo code FLINK101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Stateful Stream Processing with Flink SQL

Hey, this is David Anderson from Confluent here to talk about stateful stream processing and how that fits into Flink SQL. Earlier in the course, we looked at this example which is counting events for each color except for the orange events which are filtered out first. The filtering step of this pipeline doesn't need any state. In general, the only stream processing operations that can be performed without any state are those that apply simple filters or transformations that can be executed solely on the basis of the contents of the event being processed. Counting on the other hand is a good example of an operation that requires keeping some state because we have grouped the events by color. For any given color, one parallel instance of the counting operator will see all of the events for that color. Flink manages this kind of state in a distributed key value store with each parallel instance managing the state for some of the keys or in this example, the current counts for some of the colors. In order to achieve high performance, Flink keeps its state local to each processing node. And in order to provide fault tolerance, Flink periodically checkpoints this state by copying it to a remote durable object store like S3. I will explain how those checkpoints work in more detail later in the course. By contrast, traditional applications have a stateless compute layer connected across the network to a remote database. Using this more traditional approach for stream processing would not scale as well as what Flink is doing, nor would it be able to offer the low latency that many Flink applications expect. As I mentioned before, SQL filters and projections are stateless. This makes them easy for Flink to implement and there's nothing here to worry about. On the other hand, aggregations and joins are potentially dangerous. We call these operations materializing operations because they need to maintain at all times an internal copy of the data related to the query. In this case, that internal data structure is a map from colors to counters where Flink SQL keeps track of how many events of each color have been processed so far. As new events arrive, that internal map is updated. The reason why this can be problematic is that this internal map from colors to counters must be maintained indefinitely but how much data is involved depends on how many different colors there are. If the things being counted only come in two colors, then Flink will only need to store two counters. On the other hand, if there are millions of colors, then this query will eventually need to store millions of counters. To be clear, having Flink manage millions or even billions of counters isn't necessarily a problem. And of course, in many real world applications, the state being accumulated will be something larger than a counter. But so long as there is a clear upper limit to how much state is involved and you've allocated enough storage for it, this will just work. But if you have an unbounded requirement for storage, then no matter how much storage is available if the query is left running long enough, it will eventually overflow the data store. In that case, the Flink job will eventually fail. We just walked through an example where we looked at the storage requirements for handling unbounded streaming queries that do aggregations using GROUP BY. Streaming joins face the same issue. Flink must store indefinitely every record ever processed for both sides of the join. That's because any previously processed record could potentially be needed as part of the join result for a newly arriving record. Let me add one final clarification. The state retention problem I've been talking about is only relevant when Flink is operating in streaming mode. When Flink is used for batch processing, the runtime is much more efficient and these challenges can be avoided. One way to avoid the state retention problem we just talked about is to impose temporal constraints on your aggregations and joins. This third family of SQL operators also maintain state in a materialized view but that state has a predictable finite lifetime. In other words, every piece of state that these temporal operators create will later be freed. This makes these temporal operations safer to use than the fully materializing operations I showed you before. The query I've included here as an example is counting events by color and windows that are 10 minutes long. Yes, there's a lot going on in this query and I'm not gonna try to unpack it all here. For more information on how Windows work in Flink SQL, please check out the hands-on exercises. But the intuition is that the counters being maintained by this query are only needed until a window has ended. Here the windows are 10 minutes long. Once this 10 minute interval has expired, a window will produce its results and clear its state. This puts an upper bound on how much state Flink must maintain to handle this query and that's going to be related to the number of distinct colors that can be processed for a window that is 10 minutes long. In addition to time-based window aggregations, Flink has a few other temporal operations. All of these temporal operations work the same way. They're all waiting for a specific time interval to elapse after which they will produce a result and clear the state they were keeping. As you can see, when you are doing stream processing, state and time go hand in hand. In very particular situations such as windowing, Flink is able to free up internal state based on the passage of time. In a later video, I'll do a deep dive into this temporal dimension. If you aren't already on Confluent Developer, head there now using the link in the video description to access other courses, hands-on exercises and many other resources for continuing your learning journey.