Principal Software Practice Lead
Many simple operations in Apache Flink are linear, allowing them to remain on a single machine without sending data around the cluster. Aggregations, however, break this pattern. They often pull data from multiple Kafka partitions and combine it into a single result. This requires data transfer between Flink instances to ensure it is available where needed.
Although this data transfer introduces additional resource usage, it can be advantageous. In addition to providing a tool to logically group records, the groupBy clause can also redistribute the data across the cluster. This distribution is independent of the partitioning defined by the upstream Kafka topics, allowing it to scale in ways the Kafka partitions would usually prevent. This allows Flink jobs to be more elastic.
However, we do have to be cautious. Some operations require more storage than others. At times, distributing this storage across the cluster may reduce the overall burden on each instance. Other times, it may result in data duplication and could aggravate the problem. Monitoring what state is being stored in the operation and how it is distributed can be important for some jobs.
In this video, we'll walk through some of the more common aggregations in the Flink Table API and see how they are distributed within the cluster. We'll dive into the groupBy clause and see how it impacts the data distribution. Finally, we'll look at the distinct clause to see how its state management differs from more common operations such as sum.
An example of a basic aggregation statement that uses the most common operators such as count, sum, avg, min, and max.
env.from("`vehicles`.`telemetry`.`trips`")
.select(
$("vin").count().as("total_trips"),
$("distance").sum().as("total_distance_driven"),
$("fuel_economy").avg().as("average_fuel_economy"),
$("fuel_economy").min().as("min_fuel_economy"),
$("fuel_economy").max().as("max_fuel_economy")
)
.insertInto("`vehicles`.`telemetry`.`trips_overview`")
.execute();
An example of an aggregation statement that uses groupBy to group the aggregated results according to a single field (vin).
env.from("`vehicles`.`telemetry`.`trips`")
.groupBy(
$("vin")
)
.select(
$("distance").sum().as("total_distance_driven"),
$("distance").avg().as("average_trip_distance"),
$("fuel_economy").avg().as("average_fuel_economy")
)
.insertInto("`vehicles`.`telemetry`.`vehicle_statistics`")
.execute();
An example of a more complex groupBy statement that groups over multiple fields.
env.from("`vehicles`.`telemetry`.`trips`")
.groupBy(
$("make"),
$("model"),
$("year")
)
.select(
$("make"),
$("model"),
$("year"),
$("fuel_economy").avg().as("average_fuel_economy")
)
.insertInto("`vehicles`.`telemetry`.`average_fuel_economy`")
.execute();
An example of using the distinct clause to separate out distinct combinations of make and model of the vehicle.
env.from("`vehicles`.`telemetry`.`trips`")
.select(
$("make"),
$("model")
)
.distinct()
.insertInto("`vehicles`.`telemetry`.`makes_and_models`")
.execute();
Many simple operations in Apache Flink are fairly linear in nature.
They can remain on a single machine without sending data all over the cluster.
However, aggregation operations often require data to be shared among Flink instances.
This is especially true of the groupBy operation which is designed to distribute records according to specified criteria.
I'm going to take a few minutes and walk through some common aggregation functions in the Flink Table API, and how they impact the distribution of data in the cluster.
Let's start with a use case.
Imagine a vehicle manufacturer with embedded telemetry in their vehicles.
Each time a driver goes on a trip, the vehicle records details such as the distance traveled, the fuel economy, etc.
This data is pushed to a Kafka topic named trips where it will be processed by a set of Flink jobs.
One of the simplest jobs will produce a high-level view of the trips taken by all vehicles in the system.
This simple job will consume from the trips topic, and push the results into a trips_overview topic, as you see here.
The first metric the job will collect is the total number of trips recorded in the system.
This one is easy.
The count operator will calculate the number of records that appear in the query.
Assuming all fields are required, the count could be applied to any of them with the same result.
However, since we have to pick one, we'll select the vehicle identification number.
Next, we want to know the total distance traveled by all vehicles registered in the system.
For this, we can use a sum of the distances recorded for each trip.
To get a sense of the fuel economy for the vehicles we can take the average of the fuel_economy field.
Meanwhile, a minimum and maximum of the fuel_economy will give us a best and worst case scenario for the efficiency.
These five functions: count, sum, average, min, and max, represent some of the most common aggregations you will find in the wild.
Now, let's zoom in on one of those aggregations to see how Flink handles it under the hood.
We'll look at the sum operation, although the other operations will be similar.
The Flink job is consuming from a Kafka topic named "trips".
This topic will be partitioned to enable scalability in the system.
I'll draw two partitions for simplicity, but there would likely be more.
The sum operator will consume messages from these partitions, as shown here.
Of course, multiple operations are happening, so this would be more complex, but we'll focus on one operator for now.
Having a single instance process the sum wouldn't be great for scalability, so instead, Flink can use multiple instances, each consuming from a different partition.
Each instance will compute the sum of the partition it reads from, but that's not good enough.
An aggregation step will take the results of each sum and combine them into an overall sum.
The aggregation isn't scalable, but thankfully, it's dealing with a smaller set of messages since the sums have already done most of the work.
As a result, scaling the aggregation shouldn't be necessary.
Confluent Cloud will automatically scale the operators up or down to meet demand, however, there are limitations to what it can do.
To ensure ordering, only one consumer is allowed per partition.
With two partitions, if we added a third sum instance, it would be idle because all partitions are already being consumed.
However, it's possible to have more partitions than operator instances because each operator can consume multiple partitions.
This won't impact the ordering guarantees from Kafka.
Just be aware that with few partitions, it's easy to end up unbalanced, with some operators doing more work than others.
A best practice is to configure enough partitions to ensure room for scalability without creating too much imbalance.
One of the benefits of using Flink is its ability to redistribute data to meet the needs of a job.
Some Flink operations can take the incoming partitions and shuffle data to a larger set of consumers.
Let's take a quick look at one of these operations.
The groupBy operator separates the incoming data stream into multiple logical groups based on the criteria provided.
In the example shown, it would create a separate group for each vehicle identification number.
So, trips from vin1 would go into a single group, highlighted in blue.
Trips from vin2 would go into a different group, highlighted in yellow.
Finally trips from vin3 would go into a final group, highlighted in green.
From there, we can apply aggregations as we did before, except now, they apply to individual groups rather than the entire stream.
In this example, we create a separate set of statistics for each vehicle based on the vehicle identifier, including:
the total distance,
average distance,
and average fuel economy.
These metrics could be used to populate widgets in the companion application for a user's vehicle.
But how does this impact the scalability of the Flink job?
Normally, the number of consumers is limited by the number of partitions.
However, if that consumer is a groupBy operator, it provides additional flexibility.
The logical streams created by the groupBy operator can be redistributed in new ways.
This allows the system to scale downstream operators beyond the limits created by the Kafka partitions.
Furthermore, while the previous example needed a final aggregation to combine the results from multiple streams, here, it's unnecessary because we aren't looking for a global sum.
Now, the groupBy isn't limited to a single field.
In this example, we group over a combination of the make, model, and year.
This lets us produce a stream that shows the average fuel economy for each type of vehicle.
Let's consider one more operation.
Imagine we wanted a list of every combination of make and model in the trips.
We can do that using the distinct operation shown here.
Like a global sum, the distinct operation requires everything to be funneled through a single operator instance.
However, similar to the sum, we can scale this by performing individual distinct operations on each partition and then aggregating them in a final step.
But, there is a difference.
Distinct has to collect all the values it has seen.
These values are stored in each operator, including the aggregation.
For a small data set with limited options, such as the make and model of our vehicles, this might not matter.
However, if the potential number of distinct values was high, such as for vehicle identification numbers, it could result in significant storage.
And this gets worse as we add more fields to the operation because it creates a combinatorial explosion.
The lesson is to be careful with stateful operations in Flink.
Some operations, such as count, sum, and average, are usually safe and can be performed with minimal storage.
Others, such as distinct, require significantly more storage, depending on how you have configured it.
So pay attention when using these operations.
Understanding how they work can help ensure your state doesn't grow unexpectedly.
Aggregation in Flink can get much more complicated than we've seen here.
In a future video, we'll cover aggregations that operate over temporal windows.
So keep your eyes peeled for that one.
In the meantime, drop me a comment if you have any questions, or let me know what you thought of this video.
And thanks for watching.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.