Staff Software Practice Lead
Up to this point, we've been looking at event transformations where a single event comes in, and a single event goes out. While these transformations are useful, often, we want to do something more complex. Analytics require us to take multiple events that occur over some span of time (minutes, hours, days) and aggregate the result.
Flink handles this through its window API. In this exercise we will leverage Flink windows to aggregate data and push the results to a new topic.
Stage the exercise by executing:
./exercise.sh stage 04
Often, when users make a purchase through the online platform, they will make a second purchase a short time later. Essentially, after making their first purchase, they realize there was something else they needed. Treating these purchases as independent can result in multiple shipments and multiple deliveries. This is wasteful. We would like to be able to roll those purchases into a single event, so they can be shipped together.
To solve this problem, we'll use a Tumbling window. This will separate our data into discrete windows of a specific length. The business would set the window size to 1 day in order to match their shipping schedule. I.E. All orders will be shipped the next business day (if possible). However, for testing purposes, we'll keep the window size small (seconds to minutes) because we don't want to wait 24 hours to see results.
It's important to recognize a potential flaw with tumbling windows. It is possible for two purchases to happen within minutes of each other, but fall into different windows. For example, if the window was one hour, one event could happen at 59 minutes and a second event 2 minutes later. These would fall into separate windows and wouldn't be combined into a single shipment. However, if we align the size of the window with the shipping policy of the business (next business day), we can avoid this problem.
When building windows, be careful to ensure that the type, and length of the window align with the needs of the business.
Let's start by creating the destination table that will hold our data.
In the OrderService.java, add a constructor parameter named ordersForPeriodTableName and use it to populate a field. In Marketplace.java, populate this field with the value:
`flink-table-api-java`.`marketplace`.`customer-orders-collected-for-period`
Define a method in the OrderService named createOrdersForPeriodTable that returns a TableResult. Implement it to create the following table:
CREATE TABLE IF NOT EXISTS `TABLE NAME` (
`customer_id` INT NOT NULL,
`window_start` TIMESTAMP(3) NOT NULL,
`window_end` TIMESTAMP(3) NOT NULL,
`period_in_seconds` BIGINT NOT NULL,
`product_ids` MULTISET<STRING NOT NULL> NOT NULL
) WITH (
'kafka.retention.time' = '1 h',
'scan.startup.mode' = 'earliest-offset'
);
Don't forget to replace TABLE NAME with the correct name.
Now that the table exists, the next step is to populate it with data.
Create a new method in the OrderService named streamOrdersForPeriod. It should take a single parameter of type java.time.Duration and return a TableResult.
Set the catalog to examples.
Set the database to marketplace.
From the orders table, create a tumbling window lasting a number of seconds equal to the duration passed into the method.
You can define a 5-second tumbling window as follows:
Tumble
.over(lit(5).seconds())
.on($("$rowtime"))
.as("window")
And you get get the number of seconds in the period using:
period.toSeconds()
Group all records according to the customer_id and the window.
A windowed group by clause looks like:
.groupBy(
$("fieldToGroupBy"),
$("nameOfTheWindow")
)
Take a look at the CREATE TABLE statement and use it to determine what fields you need for your select clause.
You will have to use the as method to rename the column.
Insert the results into the ordersForPeriodTableName table defined above.
Don't forget to run the tests.
Now we need to run the pipeline.
Update Marketplace.java.
Call createOrdersForPeriodTable.
Call streamOrdersForPeriod giving it a value of one minute.
One minute isn't realistic. For production, you'd want this to be as much as 24 hours. But, we don't really want to wait 24 hours to see results, so we'll start small.
Run the Marketplace.
Let's verify that the stream is working.
Don't forget to check your running Flink statements and terminate any you don't want to keep running.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.