Staff Software Practice Lead
In this exercise, we will execute some Flink Table API queries using the built-in Marketplace tables in Confluent Cloud.
We'll start with basic select statements to set a foundation for future exercises.
Stage the exercise by executing:
./exercise.sh stage 02
You should now see a CustomerService.java and OrderService.java in your marketplace package and a series of tests for each.
The tests have been broken into two types: Unit Tests and Integration Tests.
The Unit Tests use mocks to replace the Flink functionality and verify behavior. These tests are there to help guide you through your implementation. They are not intended to be examples of how you should test a real application. This style of test is too tied to the individual implementations and could hinder refactoring in a production system.
The Integration Tests will connect to Confluent Cloud and execute the resulting SQL. Because the Integration Tests connect to Confluent Cloud, they can be slow. You may want to limit how often you run them.
The integration tests are performing a network request that can sometimes take longer than expected. As a result, they may timeout. If this happens, just run the test again.
You can run specific tests as needed in your IDE. Or you can use the following Maven commands:
Run a specific test:
mvn test -Dtest=<ClassName>#<TestName>
Run all unit tests:
mvn clean test -Dgroups=UnitTest
Run all integration tests:
mvn clean test -Dgroups=IntegrationTest
Run both unit and integration tests
mvn clean test
Currently, the tests will fail because the methods haven't been implemented.
While working through the exercise, it can be helpful to periodically run the tests to measure progress.
These tests make assumptions about the structure of your code. The instructions will guide you, but if you encounter issues, it can be valuable to look at the tests to see what they are expecting and modify your code to match.
The first task is to implement the CustomerService.allCustomers method.
This is an easy-to-understand query. We are trying to obtain the details for all of the customers.
Very simple queries like this can be useful when debugging an application to learn more about the shape of the data.
More importantly, this creates a foundation for future work.
It's important to remember that the data in the table is streaming and unbounded. Once the query is executed it will run until terminated.
Implement the CustomerService.allCustomers method as follows:
Select all fields from all customers (use the customersTableName parameter for the table name).
You can implement a basic select statement as follows:
env.from("TABLE NAME")
.select($("*"))
.execute();
Execute and return the result.
Run the following tests to verify your solution:
CustomerServiceTest.allCustomers_shouldSelectAllFields()
CustomerServiceIntegrationTest.allCustomers_shouldReturnTheDetailsOfAllCustomers()
You can run the tests through your IDE, or you can run an individual test using Maven as follows
mvn test -Dtest=CustomerServiceTest#allCustomers_shouldSelectAllFields
For a detailed look at what functions are available in the Confluent Flink Table API, refer to the Confluent Flink Table API Documentation.
Next, we can try the query in the Marketplace.java class.
Modify Marketplace.java as follows.
`examples`.`marketplace`.`customers`
Now, if you run the Marketplace it should print out Customer records until you terminate the application.
Executing the query in the marketplace generates a corresponding SQL query in Confluent Cloud. We can inspect that query to see what it looks like.
It won't be a direct match to the code that you wrote. The records have a hidden $rowtime field. Don't worry about that for now. We will return to it later in the course.
Next, we will implement the CustomerService.allCustomerAddresses method. The shipping department will require addresses so they know where to deliver the orders. However, they don't need other customer details. Here, we will create a query that gives them only the details they need.
Implement the CustomerService.allCustomerAddresses method as follows:
The $("fieldName") syntax is used to build what is known as an API Expression. These API Expressions will get more complex throughout the course.
You can specify $("fieldName") multiple times, separated by a comma.
Run the following tests to verify your solution:
CustomerServiceTest.allCustomerAddresses_shouldSelectOnlyTheRelevantFields()
CustomerServiceIntegrationTest.allCustomerAddresses_shouldReturnTheAddressesOfAllCustomers()
You can also print the results inside the Marketplace.java file. However, if you try to print results from multiple unbounded queries, only the first one will succeed. The others will be stuck waiting for a query that never finishes. You can still execute multiple queries, you just can't print the results from the same thread.
Have a look at the query in Confluent Cloud. Is it what you expected?
The previous query filtered specific columns or fields from the records. Now, we'll implement a query that filters specific records.
We will implement the OrderService.ordersOver50Dollars method. The eCommerce site has a policy that grants free shipping on all orders over 50 dollars. We want to determine which orders qualify for free shipping.
Implement the OrderService.ordersOver50Dollars method as follows:
Select all fields from the orders table.
Use a where clause to check if the price field isGreaterOrEqual to 50.
The `where` method takes an API Expression (eg. $("fieldName")) as a parameter. You can call additional methods such as the isGreaterOrEqual method on those expressions.
$("fieldName").isGreaterOrEqual(value)
Execute and return the results.
Run the following tests:
OrderServiceTest.ordersOver50Dollars_shouldSelectOrdersWhereThePriceIsGreaterThan50()
OrderServiceIntegrationTest.ordersOver50Dollars_shouldOnlyReturnOrdersWithAPriceOf50DollarsOrMore()
Create an instance of the OrderService in Marketplace.java pointing it at the table:
`examples`.`marketplace`.`orders`
Execute the new method to see the results.
The final task will be a little more difficult. We will implement the OrderService.pricesWithTax method. It will compute a new price after applying a tax rate (eg. Tax Rate = 1.15). This seems simple enough but there is a hidden issue in the data.
Implement the OrderService.pricesWithTax method as follows:
Using API Expressions you can obtain the value of the same field multiple times, but apply different transformations to each. For example, you can say
$("my_string")
.as("original_string"),
$("my_string")
.upperCase()
.as("uppercase_string")
Run only the following test:
OrderServiceIntegrationTest.pricesWithTax_shouldReturnTheCorrectPrices
mvn test -Dtest=OrderServiceTest#pricesWithTax_shouldReturnTheCorrectPrices
The test should fail. Inspecting the output should reveal that the expected and actual amounts differ by exactly one cent. But why?
Let's take a moment to try and understand what happened.
Now let's see what happens if we run a slightly modified version of the query.
Many of the price_with_tax entries have a long sequence of trailing decimal places (eg. 99999...). However, if you manually do the calculation, you won't get those long trailing decimal places. This is a clue to the problem.
Let's have a look at how the orders table is defined.
Create a new statement using the + button beside your existing statment. Use the following query:
SHOW CREATE TABLE examples.marketplace.orders;
Run the query and inspect the results.
Observe the data type for the price field. It is listed as a DOUBLE.
Floats and doubles are difficult to represent in a binary format. As a result, small errors can creep into floating-point arithmetic. These small errors are notoriously bad when dealing with currencies and result in rounding errors.
To fix this, we need to stop using the DOUBLE type and instead use DECIMAL(10, 2) which is better for working with currency. We can't change the source table. But that doesn't mean we can't fix the problem.
We have 4 decimals rather than 2 (Remember, it's a DECIMAL(10,2). But why? The price has 2 decimals and the tax rate has 2 decimals. When you multiply them 4 decimals are required to obtain the necessary precision.
Next, translate this solution into the OrderService.priceWithTax method as follows:
Use the cast method on the API Expression to convert the price field to a DataTypes.DECIMAL(10, 2) prior to calculating the price_with_tax.
For completeness, let's apply the same cast to the original_price, even though it isn't affected by this problem. This will ensure that the prices are all listed with two decimal points.
The cast method is part of the API expression, similar to the as method or the round method.
Be careful. The location of the cast is important. Should it come before, or after the multiplication?
Run all the tests and verify they pass.
Execute the method inside the Marketplace.java to see the results.
One of the important things we learned in this exercise is how to use Confluent Cloud to debug Table API queries.
The Flink Statement browser allows us to collect details about the SQL statements that are generated by the Table API. Meanwhile, the Stream Processing workspaces allow us to manually edit and execute SQL statements to help locate problems and their solutions.
These are valuable tools you can use throughout the rest of the course whenever you find yourself getting stuck on an exercise. If you aren't sure what's going on, look at the SQL statement in Confluent Cloud. See if you can modify it to work there, then go back to the code and try to replicate that change.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.