Staff Software Practice Lead
Up to this point, we've focused on building Flink queries that send the results to the standard output. But that's not really what Flink is good at. Flink's real power is in building streaming pipelines where the data flows from one location (or more) to another (such as between Apache Kafka topics).
In this exercise, we'll expand on our previous work and build a full streaming pipeline.
Stage the exercise by executing:
./exercise.sh stage 03
We want to create a topic that contains all orders that qualify for free shipping (i.e. > fifty dollars).
The data in the new topic will use the following structure:
{
"order_id":"order-8675309",
"details":{
"customer_id":42,
"product_id":"product-24601",
"price":51.0
}
}
In the OrderService.java, update the constructor to take an additional string parameter named freeShippingTableName. Use this to populate a private class field. Update the Marketplace.java file to populate this parameter with the value:
`flink-table-api-java`.`marketplace`.`order-qualified-for-free-shipping`
Implement a new method in the OrderService named createFreeShippingTable as follows:
CREATE TABLE IF NOT EXISTS `TABLE NAME` (
`order_id` STRING NOT NULL,
`details` ROW (
`customer_id` INT NOT NULL,
`product_id` STRING NOT NULL,
`price` DOUBLE NOT NULL
) NOT NULL
) WITH (
'kafka.retention.time' = '1 h',
'scan.startup.mode' = 'earliest-offset'
);
NOTE: Replace `TABLE NAME` with the value of the freeShippingTableName field.
NOTE: We have set the scan.startup.mode to earliest-offset which will force queries on the table to read all records from the beginning of the topic. We've set the kafka.retention.time to 1 hour to minimize the use of resources.
Executing this method will automatically create the Flink table, the corresponding Kafka topic, and the message schema.
You won't be able to run the tests just yet.
Next, create the pipeline that pulls data from the orders table and pushes it to the free shipping table.
Still inside the OrderService.java create a new method named streamOrdersOver50Dollars as follows:
Copy the method signature and contents from the ordersOver50Dollars method.
Modify the select statement as follows:
Select the order_id.
Nest the customer_id, product_id, and price fields inside a row object named details.
You can nest fields inside an object as follows:
row(
$("inner_field_1"),
$("inner_field_2")
).as("outer_field")
Because the price is now nested inside the details object, the where clause won't work as is. The easiest way to fix this is to move the where clause before the select statement. This will ensure it executes on the data before it gets nested.
Use the insertInto method to push the data into the free shipping table you defined above.
The insertInto method will requires the fully qualified table name. You can construct this using the following format:
`Environment`.`Kafka_cluster`.`Table_name`
You will need backticks around the names if they include non-standard characters like dashes. As long as you have been following instructions, you will have already passed in the correct table name.
Return the results of executing the statement.
You should now be able to run the tests.
Now we need to run the pipeline.
Update Marketplace.java as follows.
NOTE: You should have already updated the constructor for the OrderService (See instructions above).
WARNING: Because this is an unbounded stream, it will continue to run, even after you terminate the application. This will continue to use resources.
You can programmatically terminate a job by executing:
tableResult.getJobClient().get().cancel();
It can also be terminated from the Confluent Cloud UI.
Let's verify that the stream is working.
NOTE: It might take a little bit of time for the Flink statements to finish executing. If you don't see the topic, or don't see the messages, give it a bit of time and try again.
WARNING: Don't forget to check your running Flink statements and terminate any you don't want to keep running.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.