Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Windowing and Watermarks in Flink

Overview

When working with infinite streams of data, some operations require us to split the stream into more manageable chunks. This is done using time windows and it allows us to perform aggregations that would otherwise be impossible. When working with windows, Flink uses something known as a watermark to track time throughout the stream. In this video, you will learn the basic types of windows that can be applied to a stream, as well as how to enable watermarks.

Topics:

  • Window vs WindowAll
  • Tumbling/Sliding/Session Windows
  • Window Join
  • Watermarks
  • Monotonous vs Bounded Out of Orderness
  • Timestamp Assigners
  • Idleness

Code

WindowAll

stream.windowAll(timeWindow);

Window

stream
	.keyBy(record -> record.key)
	.window(timeWindow)

Tumbling Event Time Windows

stream
	.keyBy(record -> record.key)
	.window(
		TumblingEventTimeWindows
			.of(Time.seconds(5))
	)

Sliding Event Time Windows

stream
	.keyBy(record -> record.key)
	.window(
		SlidingEventTimeWindows
			.of(Time.seconds(10), Time.seconds(5))
	)

Session Event Time Windows

stream
	.keyBy(record -> record.key)
	.window(
		EventTimeSessionWindows
			.withGap(Time.minutes(10))
	)

Window Join

stream1.join(stream2)
	.where(elem1 -> elem1.getKey()).equalTo(
    	elem2 -> elem2.getKey()
	)
	.window(timeWindow)
	.apply(new JoinFunction<Input1, Input2, Output>() {
		@Override
		public Output join(Input1 input1, Input2 input2) {
		    ...
		}
	});

Monotonous Timestamps

WatermarkStrategy<DataType> watermarkStrategy =
	WatermarkStrategy.forMonotonousTimestamps()

Bounded Out of Orderness

WatermarkStrategy<DataType> watermarkStrategy = 
	WatermarkStrategy.forBoundedOutOfOrderness(
		Duration.ofSeconds(10)
	)

Timestamp Assigner

WatermarkStrategy<DataType> watermarkStrategy = 
	WatermarkStrategy.forBoundedOutOfOrderness(
		Duration.ofSeconds(10)
	)
	.withTimestampAssigner(
		(event, timestamp) -> timestamp
	)

Idleness

WatermarkStrategy<DataType> watermarkStrategy = 
	WatermarkStrategy.forBoundedOutOfOrderness(
		Duration.ofSeconds(10)
	)
	.withTimestampAssigner(
		(event, timestamp) -> timestamp
	)
	.withIdleness(Duration.ofSeconds(10));

Resources

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.