Get Started Free
course: Apache Flink® Table API: Processing Data Streams in Java

Exercise: Building a Data Streaming Pipeline with the Flink Table API

20 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Building a Data Streaming Pipeline with the Flink Table API

Up to this point, we've focused on building Flink queries that send the results to the standard output. But that's not really what Flink is good at. Flink's real power is in building streaming pipelines where the data flows from one location (or more) to another (such as between Apache Kafka topics).

In this exercise, we'll expand on our previous work and build a full streaming pipeline.

Stage the exercise

Stage the exercise by executing:

./exercise.sh stage 03

Creating a table

We want to create a topic that contains all orders that qualify for free shipping (i.e. > fifty dollars).

The data in the new topic will use the following structure:

{
	"order_id":"order-8675309",
	"details":{
		"customer_id":42,
		"product_id":"product-24601",
		"price":51.0
	}
}

In the OrderService.java, update the constructor to take an additional string parameter named freeShippingTableName. Use this to populate a private class field. Update the Marketplace.java file to populate this parameter with the value:

`flink-table-api-java`.`marketplace`.`order-qualified-for-free-shipping`

Implement a new method in the OrderService named createFreeShippingTable as follows:

  • The method should take no parameters and return a TableResult.
  • Use env.executeSql to execute the following SQL statement and return the result.
CREATE TABLE IF NOT EXISTS `TABLE NAME` (
   `order_id` STRING NOT NULL,
   `details` ROW (
          `customer_id` INT NOT NULL,
          `product_id` STRING NOT NULL,
          `price` DOUBLE NOT NULL   
   ) NOT NULL
) WITH (
   'kafka.retention.time' = '1 h',
   'scan.startup.mode' = 'earliest-offset'
);

NOTE

Replace `TABLE NAME` with the value of the freeShippingTableName field.

NOTE

We have set the scan.startup.mode to earliest-offset which will force queries on the table to read all records from the beginning of the topic. We've set the kafka.retention.time to 1 hour to minimize the use of resources.

Executing this method will automatically create the Flink table, the corresponding Kafka topic, and the message schema.

You won't be able to run the tests just yet.

Creating a pipeline

Next, create the pipeline that pulls data from the orders table and pushes it to the free shipping table.

Still inside the OrderService.java create a new method named streamOrdersOver50Dollars as follows:

  • Copy the method signature and contents from the ordersOver50Dollars method.

  • Modify the select statement as follows:

    • Select the order_id.

    • Nest the customer_id, product_id, and price fields inside a row object named details.

      Hint

      You can nest fields inside an object as follows:

      row(
          $("inner_field_1"),
          $("inner_field_2")
      ).as("outer_field")

  • Because the price is now nested inside the details object, the where clause won't work as is. The easiest way to fix this is to move the where clause before the select statement. This will ensure it executes on the data before it gets nested.

  • Use the insertInto method to push the data into the free shipping table you defined above.

    Hint

    The insertInto method will requires the fully qualified table name. You can construct this using the following format:

    `Environment`.`Kafka_cluster`.`Table_name`

    You will need backticks around the names if they include non-standard characters like dashes. As long as you have been following instructions, you will have already passed in the correct table name.

  • Return the results of executing the statement.

You should now be able to run the tests.

Update the marketplace

Now we need to run the pipeline.

Update Marketplace.java as follows.

NOTE

You should have already updated the constructor for the OrderService (See instructions above).

  • Call the createFreeShippingTable to ensure the table has been created.
  • Call the streamOrdersOver50Dollars.
    • Don't bother printing the results. The data will be streamed into Kafka.
  • Run the Marketplace.

DANGER

Because this is an unbounded stream, it will continue to run, even after you terminate the application. This will continue to use resources.

You can programmatically terminate a job by executing:

tableResult.getJobClient().get().cancel();

It can also be terminated from the Confluent Cloud UI.

Verify the results

Let's verify that the stream is working.

  • Open Confluent Cloud, navigate to the marketplace Kafka cluster, select topics, select the order-qualified-for-free-shipping topic, and select the messages tab.
  • You should be able to see messages streaming into this topic.

NOTE

It might take a little bit of time for the Flink statements to finish executing. If you don't see the topic, or don't see the messages, give it a bit of time and try again.

WARNING

Don't forget to check your running Flink statements and terminate any you don't want to keep running.

Finish

This brings us to the end of this exercise.

Use the promo code FLINKTABLEAPIJAVA & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.