An Event Streaming Application can perform continuous aggregation operations like an Event Aggregator. Normally, however, the aggregator will emit "intermediate" processing results. That's because an event stream is potentially infinite, so generally we do not know when the input is considered "complete". So, with few exceptions, there's not really a point where we can have a "final" result. Also, this emit strategy has the benefit of resulting in low latency between the time when a new input event is received and the time when updated processing results are available.
In certain situations, however, a single, final result is what we prefer to receive, rather than multiple intermediate results. For example, when we have to feed aggregation results into a system that is natively not compatible with a streaming approach. Here, we need an aggregator that is able to "suppress" intermediate results so that only a single, final result is being produced.
How can an event aggregator provide a final aggregation result, rather than "intermediate" results that keep being updated?
Generally speaking, this is possible only for windowed aggregations. That's because, in this case, the aggregator does know when the input for a given window (e.g., a 5-minute window in order to compute 5-minute averages) is considered complete, and thus it can be configured to suppress "intermediate" results until the window time passes. How do we do this?
First, the input events of the aggregator must be windowed via an Event Grouper, i.e., the events are being grouped into "windows" based on their timestamps. Depending on the configured grouping, an event is placed exclusively into a single window, or it can be placed into multiple windows. Then, the event aggregator performs its operation on each window.
Only once the window is considered to have "passed" (e.g., a 5-minute window starting at 09:00am and ending at 09:05am) will the aggregator output a single, final result for this window. For example, consider an aggregation for an event stream of customer payments, where we want to compute the number of payments per hour. By using a window size of 1 hour, we can emit a final count for the hourly number of payments once the respective 1-hour window closes.
Ideally, the aggregator is able to handle out-of-order or "late" events, which is a common situation to deal with in an event streaming platform (e.g., an event created at 09:03 arrives only at 09:07). Here, a common technique is to let users define a so-called grace period for windows to give delayed events some extra time to arrive. Events that arrive within the grace period of a window will be processed, whereas any later events will be not be processed (e.g., if the grace period is 3 minutes, then the 09:03 event arriving at 09:07 would be included in the 09:00-09:05 window; any events arriving at or after 09:08 would be ignored by this window).
Note that the use of a grace period increases the processing latency, because the aggregator has to wait for an additional period of time before it knows the input for a given window is complete and thus before it can output the single, final result for that window.
For Apache Kafka®, the Kafka Streams client library provides a suppress operator in its DSL, which we can apply to windowed aggregations.
In the following example we compute hourly aggregations on a stream of orders, using a grace period of five minutes to wait for any orders arriving with a slight delay. The suppress operator ensures that there's only a single result event for each hourly window.
KStream<String, OrderEvent> orderStream = builder.stream(...);
orderStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(5)))
.aggregate(() -> 0.0 /* initial value of `total`, per window */,
(key, order, total) -> total + order.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double()))
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((windowKey, value) -> KeyValue.pair(windowKey.key(),value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));