Software Practice Lead
Flink jobs can measure time using either the system clock (processing time), or timestamps in the events (event time). Operations that use event time need some way to determine how long to wait for the stream to be complete before taking an action, such as closing a window. This video explains how Flink uses watermarks to address this need.
Topics:
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hey, David from Confluent here. Now I'm going to tell you about one of my favorite parts of Flink. Time is a crucial aspect of many stream processing applications, especially those doing analytics, or pattern matching. For example, you might be looking for accounts that have attempted to log in five or more times in the past minute. Use cases like this need support for both state, which is used for counting, and time. Flink distinguishes between event time and processing time, or as it is sometimes called, wall-clock time. As an event is being processed, these two different notions of time are relevant. Event time is the time when the event was originally created or recorded, while processing time is the time later on when the event is being processed. Event time is generally preferred because it is consistent, and working with it should produce deterministic results. But processing time can be simpler to work with, and so it is sometimes chosen for that reason. When working in event time, it is possible for event streams to be perfectly in order by time. But very often, event streams are only approximately ordered by their timestamps. This can happen whenever events can take different paths, and experience different delays. This is especially common in situations involving mobile devices, as in this example, where the event that occurred a few seconds later, at 14 seconds past 10 o'clock, ends up being processed first because it experiences less delay than the earlier event. Here you see part of an event stream. This event will be processed next. These events are following the event at 1:50. If you look closely, you'll see that these events are somewhat out of order. For example, there's an event at 1:55 that will be processed after the event at 1:56. Processing out-of-order event streams, like this one, can be challenging. Imagine a Flink window, specifically, a window from one to two o'clock. The plan is that this window should count the events that belong to this hour-long time interval, based on their timestamps. The events in this stream have been passing through this window one by one. This processing has been going on since one o'clock, and we are now approaching the end of the hour. How should this window decide that it has seen every event that belongs to the hour in question? It's not like the window can peek ahead in the stream and know for sure that it's safe to produce the final count for the hour that ends at two o'clock. So we need to implement some sort of heuristic that the window can use to determine when the window is probably complete, meaning that no further events from before two o'clock are likely to exist. What this decision boils down to is making an assumption about how out of order the stream actually is. Flink's watermarks embody this assumption about the degree of out-of-orderness. In this example, I have created a couple of watermarks based on the assumption that this stream is, at most, five minutes out of order. Flink's watermark generator inserts watermarks into the event streams. These watermarks flow with your data records. Each watermark carries a timestamp, and marks a specific position in the stream. These timestamps are computed by subtracting the out-of-orderness estimate from the largest timestamp seen so far. So in the case of the watermark at 1:45, this was computed by taking the timestamp of the previously processed event with the largest timestamp, which was at 1:50, and subtracting five minutes from that. This watermark tells us something about the completeness of this stream with respect to time. Specifically, it is an assertion that the stream is now complete up to, but not including, 1:45, and no earlier events are expected to follow. Earlier events are still possible, but they are not expected. If they do occur, they violate our assumption that the stream is no more than five minutes out of order, and such events will be considered late. Depending on what you are doing, late events might be ignored. We started this discussion of watermarks with a question about how long our window operator should continue processing the stream before closing the window that ends at two o'clock. We can now answer that question by saying that the window should continue until it sees a watermark at or beyond two o'clock. Flink's window operator relies on watermarks to know when to produce results. These watermarks come from a watermark generator that runs inside of Flink's Kafka consumer. I'm about to walk you through an example of how this works. This will get a bit complicated, but these details are important because if the watermarks aren't working correctly, the application may fail to produce any results. What you are seeing here is a Kafka source, followed by a window. Both the source and the window have a parallelism of two. And the window is counting events by color, which is why the sources are connected to the windows by a network shuffle that implements this re-partitioning. In this example, each instance of the Kafka source operator is reading from two Kafka partitions. We need this much parallelism in our example to fully explore what happens when Flink generates, and propagates, watermarks. Here is a sample event, with a key of A and a timestamp at 10 minutes after one o'clock. And here is another event on the other partition connected to this same source. After processing these two events, our window has recorded one event for A during the window from one to two o'clock, and one event for C. By default, every 200 milliseconds, the Kafka source is going to be asked to generate a new watermark. The source generates its watermark by first computing the watermark for each partition independently. Using the same five-minute, out-of-order estimate we used before, this produces a watermark at 1:05 for partition 0, since the largest timestamp seen so far on partition 0 is 1:10. Using the same logic, the watermark for partition 2 is 1:30. The watermark that the Kafka source produces is then the minimum of these per-partition watermarks. Now why does this make sense? Well, the watermark this Kafka source produces should carry a timestamp that reflects how complete the stream is so far. This stream from the uppermost Kafka source includes events from both partition 0 and partition 2, so it can be no more complete than the furthest behind of these two partitions, which is partition 0. And although partition 0 has seen an event with a timestamp of 1:10, it is reporting its watermark as being at 1:05 because it is allowing for events to be up to five minutes out of order. After generating this watermark, the source sends it downstream to both window operators. I am building up toward explaining how Flink computes the watermark at each instance of the window operator. This watermark at the window will need to somehow reflect the watermarks coming in from each of the two sources. So turning our attention now to the other source, so far, all this source has consumed is an event at 1:18 from partition 3, which has been counted in the appropriate window. Based on this event, this source has generated a watermark at 1:13 for partition 3. But lacking any events for partition 1, that partition doesn't yet have a useful watermark. As a result, when asked for a watermark, the source will send this information downstream. Now we are in a position to answer the question, what is the current watermark at the window? The answer is that the current watermark for any operator with multiple inputs is the minimum of the incoming watermarks. In this case, the window operators don't yet have a useful watermark. This is because no events have been processed for partition 1. As soon as partition 1 has an event, the second source can produce a good watermark, which will flow downstream. That watermark is 1:13, because that is the smaller of the two per-partition watermarks in this source. Now the watermarks for both instances of the window operator have advanced, but until these watermarks reach two o'clock, the results that are being accumulated inside the windows won't be emitted. As detailed as that explanation was, I glossed over a few details that I want to bring up now. First of all, for most use cases, allowing for events to be as much as five minutes late is being overly generous. If this example was running in real time, then the window ending at two o'clock wouldn't produce its results until five minutes later, at 2:05. That's a long time to wait, and typical worst-case message delivery delays are much shorter than five minutes. The second point is that the actual formula Flink uses to compute watermarks subtracts an additional millisecond. So the true formula takes the maximum timestamp, subtracts the out-of-orderness estimate, and then subtracts one millisecond. This is done to avoid a classic off-by-one error. So rather than showing a watermark of 1:05, for example, I should have displayed a slightly smaller watermark. And finally, there's always watermarks. The initial watermark, before any events have been processed, is a very large negative value, which is what you'll see if you inspect the watermarks. So whenever you see a crazy-looking watermark like this one, understand that this means that the watermark generator hasn't seen any events yet, so the maximum timestamp is still this very small value. I took you through this detailed example so that we can now talk about the most common troubleshooting question on the Flink forums, why isn't my Flink job producing any results? 99 times out of 100, the answer to this question has something to do with watermarking. If you want to do a quick sanity check to confirm that's the case, simply switch your job from event time to processing time. If your job now produces results, you know for sure that the watermarks are the root cause of the problem. So why does this work? When you use processing time semantics, there's no need for watermarking. The purpose of watermarks is to measure the progress of time when time is based on timestamps in the events. With processing time, time is based on the system time-of-day clock, which is always marching forward all on its own, without any need for any watermarks. The reason for this is something called the idle stream problem. If you think back to that lengthy example I just showed you, what if partition 1 had never had any events at all? In that case, the watermarks would never have advanced. Similarly, whenever any partition becomes idle, the watermarks will stall. Events from other partitions may continue to flow, and the windows will continue to collect results, but no results will be reported until the idle partition has new events. There are a few ways you can solve this problem. You can ensure that no source partition is ever idle by either doing a good job of balancing the partitions, or by artificially keeping the partitions active by including keep-alive events, or you can configure Flink to time-out idle partitions and ignore their idleness. The hands-on exercises for this course include an example showing how to do this. Okay, that was a lot to digest. Let me summarize what you need to know about watermarks. First of all, watermarks solve a problem that only occurs in stream processing, and only if you want to use event time processing logic. If you are using Flink for batch processing, or if you are using wall-clock time, you don't need watermarks. The problem that watermarks solve is to let Flink's event-time-based operations, such as windows, know when the time has come to do their thing. If your application needs watermarks, you will have to supply each source with a watermarking strategy, and that strategy will be parameterized with an estimate of the out-of-orderness. Your estimate of the out-of-orderness is a heuristic, it probably won't be correct 100% of the time. If you want to be conservative, and try to prevent having any late events, then you can configure a very long out-of-orderness interval, but this will come at the expense of additional latency. And still, no matter how long you arrange for Flink to wait, some events could, occasionally, arrive after this out-of-orderness interval has expired. On the other hand, if you want to minimize latency, you can configure your watermarking to only wait a short time for out-of-order events, but this will probably result in more events arriving late. In this way, watermarks give you control over this trade-off between completeness and latency. Flink SQL drops late events, but the DataStream API gives you more control over how late events are handled. If you are able to avoid having late events, then you can expect event time processing to produce deterministic, reproducible results. And finally, as we've seen, sources that might become idle need special attention. If you aren't already on Confluent Developer, head there now using the link in the video description to access other courses, hands-on exercises, and many other resources for continuing your learning journey.