In this tutorial, you will learn how to perform an aggregation over every row's preceding rows. For example, imagine a table of online orders and wanting to calculate the rolling average price of the last 5 orders every time a new order comes in. This is called an over window aggregation and is in contrast to group window aggregations that calculate one aggregate for each group (or group / window pair if it's a group window aggregation).
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
If you already have the Confluent Cloud resources required to populate a Table API client configuration file, e.g., from running a different tutorial, you may skip to the next step after creating or copying the properties file as documented here to over-aggregations/flink_table_api_java/src/main/resources/cloud.properties within the top-level tutorials directory.
If you need to create the Confluent Cloud infrastructure needed to run this tutorial, the confluent-flink-quickstart CLI plugin creates the resources that you need to get started with Confluent Cloud for Apache Flink. Install it by running:
confluent plugin install confluent-flink-quickstart
Run the plugin as follows to create the Confluent Cloud resources needed for this tutorial and generate a Table API client configuration file. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent flink region list --cloud <CLOUD>.
confluent flink quickstart \
--name flink_table_api_tutorials \
--max-cfu 10 \
--region us-east-1 \
--cloud aws \
--table-api-client-config-file ./over-aggregations/flink_table_api_java/src/main/resources/cloud.properties
The plugin should complete in under a minute and will generate a properties file as documented here.
Before digging into Java source code, first check out the two dependencies required to use the Flink Table API for Confluent Cloud. These are defined in the dependencies section of the over-aggregations/flink_table_api_java/build.gradle file. (For Maven, see the analogous pom.xml snippet here.)
Take a look at the source code in over-aggregations/flink_table_api_java/FlinkTableApiOveraggregations.java. These two lines instantiate a table environment for executing Table API programs against Confluent Cloud:
EnvironmentSettings envSettings = ConfluentSettings.fromResource("/cloud.properties");
TableEnvironment tableEnv = TableEnvironment.create(envSettings);
Let's aggregate one of Confluent Cloud's example tables. You can find these tables in the read-only marketplace database of the examples catalog. The source code in this example uses the Table API's Table.window method and the Over helper class to define the over window. The aggregation is a simple average price of the current row's price along with the previous 5 prices when ordered by the $rowtime system column.
TableResult tableResult = tableEnv.from("examples.marketplace.orders")
.window(
Over.orderBy($("$rowtime"))
.preceding(rowInterval(5L))
.following(CURRENT_ROW)
.as("window"))
.select(
$("price"),
$("price")
.avg()
.over($("window"))
.round(lit(2))
.as("rolling_avg_price")
)
.execute();
Given the table result, we can then materialize (in memory) the rows in the resulting stream by calling ConfluentTools.collectMaterialized or ConfluentTools.printMaterialized. This line materializes and prints 10 rows from the table result:
ConfluentTools.printMaterialized(tableResult, 10);
Alternatively, we can use the Table API's TableResult interface directly to collect rows. For example, to print the next row's over window aggregation:
try (CloseableIterator<Row> it = tableResult.collect()) {
if (it.hasNext()) {
Row row = it.next();
System.out.println(row.getField("price"));
System.out.println(row.getField("rolling_avg_price"));
}
}
You can run the example program directly in your IDE by opening the Gradle project located at over-aggregations/flink_table_api_java/, or via the command line from the top-level tutorials directory:
./gradlew over-aggregations:flink_table_api_java:run
The program will output 10 rows materialized via printMaterialized, and then an additional order price and rolling average that includes the previous 5 rows.
+-------+-------------------+
| price | rolling_avg_price |
+-------+-------------------+
| 50.91 | 50.91 |
| 20.94 | 35.93 |
| 21.7 | 31.18 |
| 30.42 | 30.99 |
| 75.44 | 39.88 |
| 78.68 | 46.35 |
| 65.96 | 48.86 |
| 85.6 | 59.63 |
| 47.66 | 63.96 |
| 60.38 | 68.95 |
+-------+-------------------+
10 rows in set
82.97
70.21
When you are done, be sure to clean up any Confluent Cloud resources created for this tutorial. Since you created all resources in a Confluent Cloud environment, you can simply delete the environment and most of the resources created for this tutorial (e.g., the Kafka cluster and Flink compute pool) will be deleted. Run the following command in your terminal to get the environment ID of the form env-123456 corresponding to the environment named flink_table_api_tutorials_environment:
confluent environment list
Delete the environment:
confluent environment delete <ENVIRONMENT_ID>
Next, delete the Flink API key. This API key isn't associated with the deleted environment so it needs to be deleted separately. Find the key:
confluent api-key list --resource flink --current-user
And then copy the 16-character alphanumeric key and delete it:
confluent api-key delete <KEY>
Finally, for the sake of housekeeping, delete the Table API client configuration file:
rm over-aggregations/flink_table_api_java/src/main/resources/cloud.properties