Combining multiple events into a single encompassing event—e.g., to compute totals, averages, etc.—is a common task in event streaming and streaming analytics.
How can multiple related events be aggregated to produce a new event?
We use an Event Grouper followed by an event aggregator. The grouper prepares the input events as needed for the subsequent aggregation step, e.g. by grouping the events based on the data field by which the aggregation is computed (such as a customer ID) and/or by grouping the events into time windows (such as 5-minute windows). The aggregator then computes the desired aggregation for each group of events, e.g., by computing the average or sum of each 5-minute window.
For example, we can use the streaming database ksqlDB and Apache Kafka® to perform an aggregation.
We'll start by creating a stream in ksqlDB called
orders, based on an existing Kafka topic of the same name:
CREATE STREAM orders (order_id INT, item_id INT, total_units DOUBLE) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO');
Then we'll create a table containing the aggregated events from that stream:
CREATE TABLE item_stats AS SELECT item_id, COUNT(*) AS total_orders, AVG(total_units) AS avg_units FROM orders WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY item_id EMIT CHANGES;
This table will be continuously updated whenever new events arrive in the
GRACE PERIODclause in ksqlDB) or watermarks to define cutoff points after which an Event Processor will discard any late-arriving input events from its processing. See the Suppressed Event Aggregator pattern for additional information.