Principal Software Practice Lead
In this hands-on exercise you'll learn how to use Flink's Web UI to inspect and understand what's happening inside of Flink while it executes a SQL query. This exercise relies on the docker-based environment described elsewhere.
docker compose run sql-client will get you into the Flink SQL CLI, where you can create a table:
CREATE TABLE `pageviews` (
`url` STRING,
`ts` TIMESTAMP(3)
)
WITH (
'connector' = 'faker',
'rows-per-second' = '100',
'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'
);
and start an unbounded streaming query:
SELECT COUNT(*) FROM pageviews;
Using your web browser, visit http://localhost:8081, which will bring up the Flink Web UI. Please explore to your heart's content, but be sure to click on the relevent job in the list of Running Jobs (there's likely only one, and it will show the query as the Job Name).
What you are seeing now is a periodically updating view of the Flink job created to run your query. In the heart of the screen you should see the job graph for this job looking something like this:
This job graph shows a processing pipeline with two stages, called tasks. Each task in a Flink job consists of one or more operators that are directly connected, or chained together, and in each parallel instance (or subtask), those chained operators run in the same thread.
For example, the task shown on the right side of this job graph is an operator chain consisting of three operators: a GroupAggregate, a ConstraintEnforcer, and a Sink.
The Web UI displays a few task-level metrics for each task: backpressure, busyness, and data skew.
Within each subtask, backpressure is reported as the percentage of time that the subtask was unable to send output downstream because the downstream subtask had fallen behind, and (temporarily) couldn't receive any more records. At the task level, what's reported is "backpressured (max)", which is the maximum backpressure across all of the parallel subtasks for the reporting period.
The busy metric reported by each subtask is the percentage of time spent doing useful work (as opposed to being either idle or backpressured). Once again, busyness is aggregated at the task level as the maximum across all of the parallel subtasks. In the example above the source task at the left is shown in red because it is busy 100% of the time. The reason for this has to do with how flink-faker is implemented. In general, tasks that are always busy are a concern, but here's it okay.
Data skew measures the degree of variation in the number of records processed per second by each of the parallel subtasks. This will be 0% if all of the subtasks handled precisely the same number of records during the reporting interval; 100% indicates maximum skew. See FLIP-418 for the details of how data skew is calculated.
The reason these task-level metrics are based on the worst-case values across all of the subtasks is that it only takes one overwhelmed instance to significantly degrade the performance of the entire cluster.
In the table below the job graph you will see some I/O metrics for each of the tasks. Note that these metrics only report internal network communication happening within Flink -- the source task will not report on traffic from an external service into Flink, nor will a sink report on communication from Flink to the outside world.
You can click on either of the rows in the table, or either of the two colored boxes in the job graph, to drill into and inspect each task in the job graph. Per-subtask metrics, and historic values, are available here.
If you now stop the query (in the SQL Client), you'll see that this job is canceled. You can click on Overview in the sidebar of the Web UI to return to the top-level view that shows both running and completed jobs.
If you return now to the SQL Client, and enter EXPLAIN SELECT COUNT(*) FROM pageviews;, you'll see output that contains the execution plan for this query:
== Optimized Execution Plan ==
GroupAggregate(select=[COUNT(*) AS EXPR$0])
+- Exchange(distribution=[single])
+- Calc(select=[0 AS $f0])
+- TableSourceScan(table=[[default_catalog, default_database, pageviews]], fields=[url, user_id, browser, ts])
...
This may appear a bit overwhelming, but this directly relates to what you saw in the job graph -- the Exchange in the query plan is separating the two tasks in the job graph, one of which is getting the input from the source and selecting the appropriate fields (all of them, in this case, since we did SELECT *), and the other task is computing the grouped aggregation.
In the left sidebar of the Web UI there are entries for Job Manager and Task Manager. Clicking on these will give you access to information about how these services are configured, their current metrics, logs, etc.
This Web UI is especially useful for debugging misbehaving jobs. Common problems such as data skew, idle sources, and backpressure can be readily observed.
In later exercises we'll return to the Web UI to examine checkpoints and watermarks.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.