Events in an Event Processing Application can often be very large. We tend to capture data exactly as it arrives, and then process it, rather than processing it first and only storing the results. So the event that we want to consume often contains much more information than we actually need for the task in hand.
For example, we might pull in a product feed from a third-party API and store that data exactly as it was received. Later, we might ask the question, "How many products are in each product category?" and find that every event contains 100 fields, when we're really only interested in counting one. At the very least, this is inefficient; the network, memory, and serialization costs are 100x higher than they need to be. But manually inspecting the data actually becomes painful -- hunting through 100 fields to find and check the one that we care about.
Equally important, we may have security and data privacy concerns to address. Imagine that we have a stream of data representing users' personal details and site preferences. If the marketing department wants to get more information about our global customer base, we might be able to share the users' timezone and currency settings, but only those fields.
We need a method of storing complete events while only giving consumers a subset of the event fields.
How can I simply consume only a few data items from a large event?
Create an Event Processor that inspects each event, pulls out the fields of interest, and passes new, smaller events downstream for further processing.
As an example, in the streaming database ksqlDB,
we can use a SELECT
statement to easily transform a rich event
stream into a stream of simpler events.
Assume that we have an event stream called products
, where
each event contains a huge number of fields. We are only interested
in four fields: producer_id
, category
, sku
, and price
. We can
prune down the events to just those fields with the following query:
CREATE OR REPLACE STREAM product_summaries AS
SELECT
product_id,
category,
sku,
price
FROM products;
Or we can perform an equivalent transformation using the Apache Kafka® client library Kafka Streams, perhaps as part of a larger processing pipeline:
builder.stream("products", Consumed.with(Serdes.Long(), productSerde))
.mapValues(
(product) -> {
ProductSummary summary = new ProductSummary();
summary.setCategory(product.getCategory());
summary.setSku(product.getSku());
summary.setPrice(product.getPrice());
return summary;
})
.to("product_summaries", Produced.with(Serdes.Long(), productSummarySerde));
Since filtering the content creates a new stream, it's worth
considering how the new stream will be partitioned, as discussed in the
Partitioned Placement pattern. By default, the
new stream will inherit the same partitioning key as its source, but
we can repartition the data to suit our new use case (for example, by
specifying a PARTITION BY
clause in ksqlDB).
In the example above, our third-party product feed might be partitioned
by the vendor's unique product_id
, but for this use case, it might
make more sense to partition the filtered events by their category
.
See the ksqlDB documentation for details.