Get Started Free
course: Apache Flink® 101

The Query Profiler (Confluent Cloud)

20 min
David Anderson

David Anderson

Principal Software Practice Lead

Using the Query Profiler (Confluent Cloud)

If you've been through the video about the Flink Runtime, or the hands-on exercise about the Flink Web UI, you will have learned about the roles of the Job Manager and Task Managers in a Flink cluster.

Confluent Cloud for Apache Flink is a serverless offering, with no need to provision or manage the underlying infrastructure. Details about the Job Managers and Task Managers are not exposed.

However, this cloud-native environment does include a suite of tools for observing and monitoring the stream processing workloads you have running in Confluent Cloud. This exercise introduces these tools in the context of a specific query, but please explore further, on your own. For inspiration, you will find additional example queries here in this course, and in the course on Flink SQL.

The query

This query is a temporal join between an immutable, append-only fact table of orders, and an updating (versioned) reference table of customer information:

SELECT
  orders.`$rowtime`,
  orders.price,
  customers.postcode
FROM examples.marketplace.orders
JOIN examples.marketplace.customers
FOR SYSTEM_TIME AS OF orders.`$rowtime`
ON orders.customer_id = customers.customer_id;

This join enriches each order with information from the version of the customer record that was in effect at the time of the order. It will emit exactly one result for each incoming order, containing the time and price of the order, along with the customer's postcode at the time of the order.

This would be a useful building block, for example, in analyzing changes in revenue per postcode over time, or variations in average order prices across postcodes.

The setup

If you haven't already done so, you can begin this hands-on exercise by following the instructions for getting started with Confluent Cloud.

You will be using the Query Profiler to observe the behavior of this query as it runs on Confluent Cloud. But first, let's do a static analysis of this temporal join.

Static analysis

You can use either the Confluent Flink shell (confluent flink shell) or the browser-based SQL Workspace (at confluent.cloud) for this step, but since the Query Profiler is browser-only, you might want to start there.

Execute the following, which puts "EXPLAIN" in front of the query to be analyzed:

EXPLAIN
SELECT
  orders.`$rowtime`,
  orders.price,
  customers.postcode
FROM examples.marketplace.orders
JOIN examples.marketplace.customers
FOR SYSTEM_TIME AS OF orders.`$rowtime`
ON orders.customer_id = customers.customer_id;

The output is a static analysis of the execution plan for this query, showing the topology of the job graph through which the data will flow, annotated with notes about the nature of the streams produced by each operation in the graph, and their expected state sizes, measured as low, medium, or high.

== Physical Plan ==

StreamSink [12]
  +- StreamCalc [11]
    +- StreamTemporalJoin [10]
      +- StreamExchange [3]
      :  +- StreamCalc [2]
      :    +- StreamTableSourceScan [1]
      +- StreamExchange [9]
        +- StreamCalc [8]
          +- StreamChangelogNormalize [7]
            +- StreamExchange [6]
              +- StreamCalc [5]
                +- StreamTableSourceScan [4]

This will be easier to understand when you compare this textual representation of the job graph to the visualization displayed by the Query Profiler.

Run the query

Now it's time to start the query:

SELECT
  orders.`$rowtime`,
  orders.price,
  customers.postcode
FROM examples.marketplace.orders
JOIN examples.marketplace.customers
FOR SYSTEM_TIME AS OF orders.`$rowtime`
ON orders.customer_id = customers.customer_id;

Observe the running query

If you've been using the Flink shell up to this point, now you'll have to log into Confluent Cloud in your web browser.

From the Confluent Cloud home page, click on "Compute pools" in the resource overview, and select the "flink101" compute pool.

Scrolling down, you will find a table showing all of the SQL statements you have run. Find the one corresonding to the query you just started (it will be at the top), and click on it.

You are now at the "Activity" tab for your query. Go ahead and explore the other tabs: "Logs", "Query profiler", and "Settings".

The Query Profiler visualizes the topology of the job graph, with each task shown as a box displaying metrics about the current behavior of that task. (A task is a chain of operations being executed together in the same thread.)

This is the task where the two streams are brought together, and the join occurs:

temporal join in job graph

If you click on this task, a panel will open up in the sidebar, where you can inspect the metrics, both at the level of the task as a whole, as well as for each of the 3 operators comprising this task:

  • TemporalJoin
  • Calc (this operator handles projections and filtering, i.e., the select and where clauses)
  • ConstraintEnforcer (responsible for any NULL or type length constraints)

You can open up each of these metrics, and observe their behavior over time. Note that some of the metrics are only available at the task level, such as busyness, which is shown here:

busyness graph in query profiler

At any moment, a Flink task is in one of three states: idle, busy, or backpressured. (Backpressured means that the task is unable to send output downstream because the downstream task is busy.) For this query, the throughput is low enough (3000 events/minute from each of the sources) that all of the tasks are idle most of the time.

On Confluent Cloud, the Flink runtime includes an elastic autoscaler, called Autopilot. Autopilot automatically scales resources up and down as required to meet the demands of your workload. Resource power and utilization is measured (and billed) in terms of an logical unit, the CFU. You can observe the number of CFUs being consumed by each statement in several places in Confluent Cloud, including on the Activity tab for the statement.

Using the Query Profiler for debugging

The Query Profiler is especially useful for debugging misbehaving jobs. Common problems such as data skew, idle sources, and backpressure can be readily observed.

Deep dive

The course on Apache Flink SQL goes deeper into the topics touched on here.

For more about the behavior of streaming joins, watch this video next:

To better understand the details of what's going inside the SQL runtime, watch these videos:

Resources

Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.