Get Started Free
‹ Back to courses
course: Building Apache Flink® Applications in Java

Windowing and Watermarks in Flink

7 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Windowing and Watermarks in Flink

Overview

When working with infinite streams of data, some operations require us to split the stream into more manageable chunks. This is done using time windows and it allows us to perform aggregations that would otherwise be impossible. When working with windows, Flink uses something known as a watermark to track time throughout the stream. In this video, you will learn the basic types of windows that can be applied to a stream, as well as how to enable watermarks.

Topics:

  • Window vs WindowAll
  • Tumbling/Sliding/Session Windows
  • Window Join
  • Watermarks
  • Monotonous vs Bounded Out of Orderness
  • Timestamp Assigners
  • Idleness

Code

WindowAll

stream.windowAll(timeWindow);

Window

stream
	.keyBy(record -> record.key)
	.window(timeWindow)

Tumbling Event Time Windows

stream
	.keyBy(record -> record.key)
	.window(
		TumblingEventTimeWindows
			.of(Time.seconds(5))
	)

Sliding Event Time Windows

stream
	.keyBy(record -> record.key)
	.window(
		SlidingEventTimeWindows
			.of(Time.seconds(10), Time.seconds(5))
	)

Session Event Time Windows

stream
	.keyBy(record -> record.key)
	.window(
		EventTimeSessionWindows
			.withGap(Time.minutes(10))
	)

Window Join

stream1.join(stream2)
	.where(elem1 -> elem1.getKey()).equalTo(
    	elem2 -> elem2.getKey()
	)
	.window(timeWindow)
	.apply(new JoinFunction<Input1, Input2, Output>() {
		@Override
		public Output join(Input1 input1, Input2 input2) {
		    ...
		}
	});

Monotonous Timestamps

WatermarkStrategy<DataType> watermarkStrategy =
	WatermarkStrategy.forMonotonousTimestamps()

Bounded Out of Orderness

WatermarkStrategy<DataType> watermarkStrategy = 
	WatermarkStrategy.forBoundedOutOfOrderness(
		Duration.ofSeconds(10)
	)

Timestamp Assigner

WatermarkStrategy<DataType> watermarkStrategy = 
	WatermarkStrategy.forBoundedOutOfOrderness(
		Duration.ofSeconds(10)
	)
	.withTimestampAssigner(
		(event, timestamp) -> timestamp
	)

Idleness

WatermarkStrategy<DataType> watermarkStrategy = 
	WatermarkStrategy.forBoundedOutOfOrderness(
		Duration.ofSeconds(10)
	)
	.withTimestampAssigner(
		(event, timestamp) -> timestamp
	)
	.withIdleness(Duration.ofSeconds(10));

Resources

Use the promo codes FLINKJAVA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Windowing and Watermarks in Flink

Hi, I'm Wade from Confluent. In this video, we'll be discussing how to implement time windows in Flink, and their relationship to watermarks. (upbeat music) Infinite streams of data present us with challenges. For example, calculating the sum of an infinite stream is impossible. Instead, we need to create boundaries in the stream to provide something meaningful. Fitness trackers record biometrics like steps and heart rate. Infinite Streams They produce an infinite stream of data. However, we still want to aggregate the data into sums and averages. To accommodate aggregation, we break the metrics into time windows such as how many steps were taken today or in the last hour. These computations separate the infinite stream into measurable time periods. The DataStream API implements time windows through the use of two operators. The windowAll operator is available on a non-keyed stream. WindowAll This is a stream where you haven't used the keyBy operator. It takes a timeWindow as a parameter which we'll discuss in more detail in a moment. Now, if we have a keyed stream, then we use the window operator instead. An important consideration is that the windowAll operator Parallelism is run by a single task without parallelism. However, keyBy partitions the stream, which allows the window operation to be run in parallel. So, how do we define the timeWindow parameter? Let's keep using our fitness tracker as an example. Trackers usually provide the ability to look back in time. Tumbling Time Windows You can see your metrics for the previous day, month, or even longer. These are discrete time intervals with a known start and end point, and there is no overlap. Metrics from today will not share any data points with metrics from yesterday. In Flink, this is known as a Tumbling Time Window. Here, we see a TumblingEventTimeWindow of 5 seconds. Tumbling Event Time Windows This means that the stream will be broken into discrete 5-second chunks. Optionally, the keyed windows can be staggered. This can be useful to ensure that multiple windows are not firing at the exact same time. Sliding Time Windows But what if we wanted to know how many steps we took in the last 24 hours? This type of window doesn't have a fixed start and end point. Instead, it slides as the current time changes. Furthermore, the windows overlap. If I check the last 24 hours, and then check it again an hour later, the datasets will share 23 hours' worth of data. In Flink, this is known as a Sliding Time Window. Sliding Event Time Windows To implement a Sliding Time Window, we need to provide the size of the window and the size of the slide. Here, we see a window that is 10 seconds long, with a slide of 5 seconds. In other words, every 5 seconds, this data stream will report the past 10 seconds worth of data. Some fitness trackers use metrics along with Session Time Windows machine learning to automatically detect certain types of exercise. They do this by separating periods of activity from inactivity. However, in this case, there's no fixed time window. Sometimes, you might be active for 5 minutes, while other times you might be active for an hour. In Flink, this is called a Session Time Window. These windows are based on the active periods. Session Event Time Windows In other words, when we see an event, we open a new window. We terminate the window when we see a gap in the events. In this case, if we don't see any events for 10 minutes, we would end the window. Now that we have a basic understanding of windowing, it opens up some new operators for us. Window Join One of the key ones is a window join. It allows us to take two datastreams and join them by: a key and a time window. Once the streams have been joined, we can use the apply operator. It takes matching elements from each stream and applies a function to return an Output type. You can think of this as a database join, but rather than operating on a table, it operates on a datastream. To determine what data goes into the window, Flink uses watermarks. You can think of them as timestamps that are added into the datastream at certain points. Watermarks flow through the stream and are consumed by each of the operators. They are then forwarded downstream. The purpose of a watermark is to guarantee that the stream is up to date. It indicates that there will be no more events with an earlier timestamp. Watermarks are defined using a WatermarkStrategy. There are a few predefined strategies, but you can also create your own. The most basic strategy is noWatermarks. WatermarkStrategy: noWatermarks It's useful if you don't care about timestamps but is unsuitable for windowing. For windowing, our simplest strategy is MonotonousTimestamps. WatermarkStrategy: for MonotonousTimestamps This tells Flink that the timestamps will be strictly increasing and never out of order. This can be suitable for messages produced by a single-threaded process. However, highly scalable systems often use multiple threads and multiple machines. In this case, the datastream may be partly out of order. The BoundedOutOfOrderness strategy tells Flink the stream is out of order within a certain time constraint. But be careful. BoundedOutOfOrderness introduces latency while the stream waits for the period to elapse. If your system is sensitive to latency, you'll want to keep this value low. If a Window encounters late messages after a watermark, the normal behavior is to drop them. You can set up alternate behaviors such as pushing those late messages to a second datastream. By default, Flink will look at the timestamp embedded in each message to generate watermarks. WatermarkStrategy: with TimestampAssigner However, those timestamps are sometimes unsuitable and are often missing. We can use the TimestampAssigner to customize how Flink calculates the timestamp. It takes a function including the event and the assigned timestamp and returns a different timestamp. WatermarkStrategy: withIdleness Sometimes you will consume from a source with a tendency to be idle. Unfortunately, Idle sources don't generate new watermarks. This can block the watermark computation for the entire pipeline and cause it to stall. You can mitigate this using an Idleness period. It provides a time period after which a source will be marked as idle. Idle sources are removed from the watermark computation allowing the stream to progress. Dealing with watermarks can be tricky. If you find yourself with a stream that isn't producing results, watermarks are usually to blame. Check out our Flink 101 course for a deeper discussion on watermarks, including how to debug them. In the meantime, let's try putting them to use in an exercise. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course and its hands-on exercises.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.