Get Started Free
course: Apache Flink® Table API: Processing Data Streams in Java

Exercise: Aggregating Data Over Time with the Flink Table API

20 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Aggregating Data Over Time with the Flink Table API

Up to this point, we've been looking at event transformations where a single event comes in, and a single event goes out. While these transformations are useful, often, we want to do something more complex. Analytics require us to take multiple events that occur over some span of time (minutes, hours, days) and aggregate the result.

Flink handles this through its window API. In this exercise we will leverage Flink windows to aggregate data and push the results to a new topic.

Stage the exercise

Stage the exercise by executing:

./exercise.sh stage 04

Customer Purchases During Period

Often, when users make a purchase through the online platform, they will make a second purchase a short time later. Essentially, after making their first purchase, they realize there was something else they needed. Treating these purchases as independent can result in multiple shipments and multiple deliveries. This is wasteful. We would like to be able to roll those purchases into a single event, so they can be shipped together.

To solve this problem, we'll use a Tumbling window. This will separate our data into discrete windows of a specific length. The business would set the window size to 1 day in order to match their shipping schedule. I.E. All orders will be shipped the next business day (if possible). However, for testing purposes, we'll keep the window size small (seconds to minutes) because we don't want to wait 24 hours to see results.

It's important to recognize a potential flaw with tumbling windows. It is possible for two purchases to happen within minutes of each other, but fall into different windows. For example, if the window was one hour, one event could happen at 59 minutes and a second event 2 minutes later. These would fall into separate windows and wouldn't be combined into a single shipment. However, if we align the size of the window with the shipping policy of the business (next business day), we can avoid this problem.

When building windows, be careful to ensure that the type, and length of the window align with the needs of the business.

Create the Table

Let's start by creating the destination table that will hold our data.

In the OrderService.java, add a constructor parameter named ordersForPeriodTableName and use it to populate a field. In Marketplace.java, populate this field with the value:

`flink-table-api-java`.`marketplace`.`customer-orders-collected-for-period`

Define a method in the OrderService named createOrdersForPeriodTable that returns a TableResult. Implement it to create the following table:

CREATE TABLE IF NOT EXISTS `TABLE NAME` (
	`customer_id` INT NOT NULL,
	`window_start` TIMESTAMP(3) NOT NULL,
	`window_end` TIMESTAMP(3) NOT NULL,
	`period_in_seconds` BIGINT NOT NULL,
	`product_ids` MULTISET<STRING NOT NULL> NOT NULL
) WITH (
	'kafka.retention.time' = '1 h',
	'scan.startup.mode' = 'earliest-offset'
);

NOTE

Don't forget to replace TABLE NAME with the correct name.

Now that the table exists, the next step is to populate it with data.

  • Create a new method in the OrderService named streamOrdersForPeriod. It should take a single parameter of type java.time.Duration and return a TableResult.

  • Set the catalog to examples.

  • Set the database to marketplace.

  • From the orders table, create a tumbling window lasting a number of seconds equal to the duration passed into the method.

    Hint

    You can define a 5-second tumbling window as follows:

    Tumble
        .over(lit(5).seconds())
        .on($("$rowtime"))
        .as("window")

    And you get get the number of seconds in the period using:

    period.toSeconds()

  • Group all records according to the customer_id and the window.

    Hint

    A windowed group by clause looks like:

    .groupBy(
        $("fieldToGroupBy"),
        $("nameOfTheWindow")
    )

  • Take a look at the CREATE TABLE statement and use it to determine what fields you need for your select clause.

    • You can group records such as the product_ids using $("fieldName").collect().
    • The start of the window can be accessed using $("nameOfTheWindow").start(). How would you get the end of the window?
    • You can use the same literal (lit) from the Tumbling Window to populate the period_in_seconds column.
    Hint

    You will have to use the as method to rename the column.

  • Insert the results into the ordersForPeriodTableName table defined above.

Don't forget to run the tests.

Update the marketplace

Now we need to run the pipeline.

Update Marketplace.java.

  • Call createOrdersForPeriodTable.

  • Call streamOrdersForPeriod giving it a value of one minute.

    NOTE

    One minute isn't realistic. For production, you'd want this to be as much as 24 hours. But, we don't really want to wait 24 hours to see results, so we'll start small.

  • Run the Marketplace.

Verify the results

Let's verify that the stream is working.

  • Open Confluent Cloud, navigate to the customer-orders-collected-for-period topic, and view the messages.
  • Remember, you will need to wait for the queries to start and then at least 1 minute for the window to elapse.

WARNING

Don't forget to check your running Flink statements and terminate any you don't want to keep running.

Finish

This brings us to the end of this exercise.

Use the promo code FLINKTABLEAPIJAVA & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.