Event Aggregator

Combining multiple events into a single encompassing event—e.g., to compute totals, averages, etc.—is a common task in event streaming and streaming analytics.

Problem

How can multiple related events be aggregated to produce a new event?

Solution

We use an Event Grouper followed by an event aggregator. The grouper prepares the input events as needed for the subsequent aggregation step, e.g. by grouping the events based on the data field by which the aggregation is computed (such as a customer ID) and/or by grouping the events into time windows (such as 5-minute windows). The aggregator then computes the desired aggregation for each group of events, e.g., by computing the average or sum of each 5-minute window.

Implementation

event-aggregator

For example, we can use the streaming database ksqlDB and Apache Kafka® to perform an aggregation.

We'll start by creating a stream in ksqlDB called orders, based on an existing Kafka topic of the same name:

CREATE STREAM orders (order_id INT, item_id INT, total_units DOUBLE)
  WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO');

Then we'll create a table containing the aggregated events from that stream:

CREATE TABLE item_stats AS 
  SELECT item_id, COUNT(*) AS total_orders, AVG(total_units) AS avg_units
  FROM orders
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY item_id 
  EMIT CHANGES;  

This table will be continuously updated whenever new events arrive in the orders stream.

Considerations

  • In event streaming, a key technical challenge is that—with few exceptions—it is generally not possible to tell whether the input data is "complete" at a given point in time. For this reason, stream processing technologies such as the streaming database ksqlDB and the Kafka Streams client library of Apache Kafka employ techniques such as slack time1 and grace periods (see GRACE PERIOD clause in ksqlDB) or watermarks to define cutoff points after which an Event Processor will discard any late-arriving input events from its processing. See the Suppressed Event Aggregator pattern for additional information.

References

Footnotes

1Slack time: Beyond Analytics: The Evolution of Stream Processing Systems (SIGMOD 2020), Aurora: a new model and architecture for data stream management (VLDB Journal 2003)

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.