Software Practice Lead
EXPLAIN is a feature of Flink SQL that presents the execution plan for SQL statements. The output of EXPLAIN makes it clear which operators will be used for execution, and how much state they can be expected to use. The video presents several examples, and talks about how to interpret the results. You will also learn about changes you can make to address problems with SQL statements that aren't well-behaved.
Topics:
While you're working with Flink SQL, you may sometimes find yourself needing to diagnose problems with Flink SQL statements that are misbehaving. I'm David Anderson from Confluent, and in this video, I'm going to show you how to use the EXPLAIN feature of Flink SQL for troubleshooting. And in fact, EXPLAIN is useful for more than just troubleshooting. You can learn to anticipate and prevent problems. My real goal with this video is to help keep you out of trouble. So what can go wrong? Well, one of the most common problems with Flink SQL is SQL statements that produce no results at all. Of course, this can be frustrating, but I'm here to tell you that the answer is almost always watermarks. Something's going wrong with your watermarks. If you wanna learn more about watermarks, I'll put some links in the description below. That's not the topic of this video. What I'm interested in here is helping you work with queries that are producing results, but doing so slowly. Perhaps over time, they're using more and more resources, and in general, they're falling behind. They're not keeping up with the input topics. If this is the case, the problem is probably something to do with how much state the Flink runtime is having to use to execute your Flink statement. Now, why is this the case? That's what we're gonna get into in this video. I'm gonna show you several examples of how to use EXPLAIN to diagnose which part of your query is causing the problem. I'll also show you how to fix it. Let's start with a very simple example. This example is filtering an orders topic. You can use EXPLAIN to understand what this query is doing. This is just a command you can access in your SQL CLI or in the workspace. You just put the words EXPLAIN in front of your select or insert statement. With open source Apache Flink, the output will look something like this. This is a lot to try to digest. The SQL planner is working in a series of stages and each stage is showing some output here to help you understand how the planner is dealing with this query. As the user trying to troubleshoot or understand how a query is being executed, It's the final stage of this output that's the most interesting. You should focus your attention there. If you're working with Apache Flink SQL in Confluent Cloud, you're going to get output that looks a bit different. Some of the details have been left out, it's all formatted differently, and some additional information has been added in. But the gist of it remains the same. You should focus your attention on the physical plan. This is showing you the topology or the job graph that Flink is going to use to execute your SQL statement. In this particular example, there are three stages to this pipeline. There's a source and a sink, which is the case for all Flink jobs. They all have sources and sinks. In between the source and sink is the business logic. And in this case, the business logic is all wrapped up in a single node called stream calc. You'll see stream calc in your job graph whenever Flink needs to do a select or a where. So this is where the projections and filtering are happening. Below the physical plan is another section labeled physical details. One of the most important parts of this output is showing you how much state each of the nodes is likely to use. Now, this is not a report based on watching the execution of the job. This is simply based on a static analysis based on an understanding of how much state each of these operators is likely to need. This state size information isn't going to be available if you're using open source Flink. The EXPLAIN there doesn't include this, But you can do this analysis on your own using what I'm going to show you how to do in this video. So now let's look at another more interesting example. This one's just a bit more complex. This query plan has a couple of additional elements. It has a stream exchange and a stream group aggregate. Now what's going on in this part of the query? First thing you need to understand is that stream exchange is not an operator. Instead, it's describing how the two operators it connects are actually connected together. To understand what's going on, let's So let's look first at the earlier example. So in this earlier example, it's trivial for Flink to split the workload here. This job is embarrassingly parallel. It's trivial to parallelize because the business logic is stateless. However, in the case of this aggregating query, this job is stateful. What we're doing in this query is counting orders per customer. So that means we have to store these per customer counters somewhere. To make that feasible, we have to arrange for every customer's orders to be processed by the same node. As each order is processed, we have to send it to the correct instance of the counting operator, the one that's counting orders for that customer. So every time you see a stream exchange, you should realize that there's an expensive network shuffle happening there. This means that each order in this case is going to be serialized and sent over a network and then deserialized and counted. This also means that the node after the stream exchange is going to be using state, that it will store in Flink's distributed key value store. So if you look at the physical details for this query, you'll notice that yes, once again, there's a source and a sink, each of which isn't using much state, and there's a stream group aggregate node, which is using amount of state here that's described as medium. So how much state is medium? Let's dive into what this means. What I'm going to be doing throughout this video categorizing each SQL operation according to how much state it might need. The SELECT and WHERE clauses are stateless. So they're completely safe, no need to worry about them. Sources and sinks use a very small amount of state. The sources, well, in the case of Kafka, the Kafka source is keeping track of the offsets for each partition. The Kafka sink is keeping track of the transaction IDs for open transactions. This just isn't much state, and it's a constant amount of state. Just depends on how many partitions there are. But in the case of aggregations, this is where things get interesting. In the query we were just looking at, where we were counting orders per customer, we're going to have as many counters as there are customers. If your business is small and you just have the same old customers year after year, this isn't going to be very much state. If, on the other hand, most of your customers come and place a few orders and go, and you have a lot of churn and a lot of customers all the time, over time, you're going to have a lot of customers. Another operation that doesn't require any state at all is the union operator. This is just merging two streams together. Another operation that needs potentially some interesting amount of state is deduplication. Now keep in mind that your goal with a SQL statement in Flink SQL is to create a Flink job that can run indefinitely. It should be able to run potentially forever. So if the nature of what you're doing means there will be more and more state over time, you're eventually going to have a problem. So what we want to do is to categorize these operations according to whether the amount of state they're going to need is bounded or unbounded. So here's an example of a join. We've got our orders. Each order has a customer involved, and we want to enrich each order with the email address of the customer who placed the order. What we're interested in is the current email address for each of these customers. Consider, for example, a case where that an item the customer ordered has been recalled and we need to send them an email now about a past order. I'm imagining in this example that the orders table is an append-only table, meaning that the orders are immutable and we're joining this with the customers table. In this case, the customers is an updating table, getting updates from the customer as to their current address, email address, and so on. Now we need to shuffle each of these streams by customer ID so that we can join them together. Every order and every customer update for a given customer are going to be processed in the same instance of the join. So here's what that looks like. We've got our orders and customers flowing into a join operator. This join operator is keeping some state. We'll talk about that in a minute. And the join is emitting results that combine information from both streams. Zooming in a bit, we see here the orders and customers. As a new order comes in, we append it We send it to the state we're keeping. We build it into that side of the join. We then probe on the other side of the join to find the customer information for the customer placing this order, and then we emit a result. On the other hand, when we get a customer update, we find and update the relevant customer record. We find all of that customer's orders, and we send them downstream to the output. So how much data are we keeping? On the append side of this join, We're keeping every order. So we're keeping state for every row of this table. On the customer's side of the join, this is the updating side. We're keeping per key or per customer state. We just have one record for each customer. So coming back to the diagram we're building up, let's add joins in. What we've just seen is that joins on appending tables, they need a new category of state, per row state. This is the worst situation. clearly an unbounded amount of state. There's no a priori limit on how many rows a table might have. So this is potentially a big problem. On the other hand, for the updating tables, we have per key state. And so whether or not this is a problem depends on the cardinality of the key space involved. So in other words, how many distinct keys are there? Or in our example, how many customers are there to keep track of? To help with this, Flink SQL has two additional types of join. These are joins that are bounded by some sort of temporal logic. So there's the so-called temporal joins as well as interval joins. And these need less state than joins on appending tables. So these are preferred if you can use them to meet your business requirements. Now let's change gears and talk about changelog processing. This is a topic I introduced in an earlier video. See the link in the description below if you want to catch up on this topic. But let's have a bit of a recap here. So for some queries, the sources, the sinks, and everything in between are operating in append-only mode. This is the ideal situation. Queries that never leave append mode will always stay in the safe zone. However, some queries produce updating streams, and these updating streams come in two flavors, upserting and retracting streams. These operations tend to be more resource intensive, and it turns out Flink sometimes needs to convert between these two different representations. Converting an upserting stream to a retracting stream is a process we call change log normalization. And going in the other direction is something called upsert materialization. Both of these operations are rather state intensive. Gonna describe how this works now. So coming back to the query we were looking at earlier, where we were counting orders per customer. In this example, I was assuming that the orders table is append only. The orders were immutable. In that case, this is what the query plan looks like. However, if we modify the orders table to be an upserting table, the query plan gets more complex. Accommodating this change requires adding in another stream exchange and something called change log normalization. We have to take the upserting stream coming from the orders table and convert it into a retracting stream before it can be aggregated. Why is this? most stateful operations that Flink SQL supports, in fact, are simpler to implement if they can operate on retract streams rather than upsert streams. To understand why this is easier, let's look at an example. So here's our query. This is its plan. Now let's talk through a very specific example. Here, the first record we're ingesting, this is an upsert. We're adding order 1001 for customer 3001. Now it's the job of changelog normalize to turn this upsert into either an update or an insert, depending on what the actual situation is. So changelog normalize has to have enough state to recognize that this is an order it hasn't seen before and turn this order into an insert record. The aggregator can then easily consume this record and check to see if it has any orders for customer 3001. It doesn't, so it counts that now it has one such order. Now imagine that the next record is a deletion. Upserting streams rely on having a primary key available. So all we need to do is specify the primary key of the order we're deleting. In this case, it's order 1001. Now, if we were to send this deletion straight into the aggregator, it wouldn't know what to do with it because it's keeping track of state on a per customer basis. It doesn't know anything about orders. The aggregator would have to keep a lot of additional state if it were to handle this upsert stream directly. So this is what changelog normalize is doing for us. It's keeping track of the information for each order. It's remembered that order 1001 is for customer 3001. So it takes this upsert record, turns it into a fully specified retraction record that the aggregator can then easily handle to, in this case, remove its count for that particular customer. So what have we just seen? Well, changelog normalize is storing the latest information for each order. It's using this information to produce a retract stream. And by ingesting a retract stream, the aggregation operator is made rather simpler. So let's build this into our diagram. What we've just seen is that change log normalization is an operation that needs per key state. Whether or not this is problematic depends on the size of the key space. Now let's dive into the final piece of the puzzle here, upsert materialization. So what does EXPLAIN tell us about this particular query is doing. What does this join look like as a query plan? It's a rather complicated looking plan. It's got quite a few steps. Here's what it looks like as a diagram. You'll notice that there are two stream exchanges, just as we expect. Both of the input streams are shuffled by customer ID before being joined. And you'll see that there's also change log normalization happening to deal with the upserting stream from the customers table. So how much state is involved? As expected, the sources don't have much state. Both changelog_normalize and the join have been characterized here as requiring a medium amount of state. Now you might argue that the join actually is using more than a medium amount of state, it's keeping per row state. Well, just keep that in mind. And then the output of EXPLAIN is telling us that the sink is using a high amount of state. Now this should surprise you. Why is this the case? We saw earlier that in general, syncs don't need much state. So what's going on here? Well, to help understand this, let's look at some of the other information that EXPLAIN is providing to us. In particular, let's look at the changelog modes at each stage here. So, EXPLAIN is telling us that the changelog mode out of the orders table is an append stream, just as we expect, and we have an upsert stream coming out of the customers table. Nothing surprising here. Changelog normalize is converting that upserting stream to a retract stream. The join is also producing a retract stream. This is what is coming into the sink. On the other hand, the sink is producing an upsert stream. So the sink is required to convert this retract stream into an upsert stream. This is related to what's going wrong here. This is telling us a clue as to why the sink needs so much state. I've said that I want an upsert table. I want a compacted table where I'm just keeping track of the latest information about that order, including the customer's email address. However, while the sink is going to be keyed by order ID, the join was keyed by customer ID. This is the situation in which an expensive operation has to happen here in the sink. The query planner here is providing us a warning. It's telling us that the primary key doesn't match. On the upsert stream coming out of the sink, we need a stream that's keyed by order ID, but the join was keyed by customer ID. This is exactly the situation in which upsert materialization is required. There's a second warning coming out of the SQL Planner. This is basically just another version of the same information. To do a final update to our diagram here, we have another category of worrisome state, and this is when we have syncs that require upstart materialization. Let me present some best practices for helping you avoid the kinds of problems we've talked about here. First of all, one of the best things you can do to stay out of trouble is to stick to append-only operations. You should use things like windows and temporal joins. This isn't a foolproof technique for staying out of trouble, but it will go a long way. Now let's talk about using state time to live to manage state. If your job has any operations that use per key state with a large and growing key space, or if you have operations that require per row states, in these cases, you really need to configure state time to live. How do you do this? Well, you can do this globally for all of the state in your SQL statement by setting one of these two configuration options. Choose which one to use based on whether you're using open source Apache Flink or Confluent Cloud. You also have the option to set query hints on specific tables. Here's an example of how to do that for this particular join. This will set state TTL independently for the customers and orders tables. But note that this won't have any effect on change log normalize or sync materialization. For that, you will need to set the session level configuration. Now, keep in mind that this interval is based on processing time, not the event-based timestamps in your data. So if it says two hours, that means two hours of time while the job is running. Another thing you want to do is to avoid sync materialization. For starters, you should always use EXPLAIN with a statement that includes the final table, as in this example, where we are explaining an insert into, where we're specifying the table being inserted into. If EXPLAIN doesn't know that the result of this query is going into an Upsert table with a specific key, it can't detect that a sync materialization might be needed. And also you want to take a careful look at the output of EXPLAIN. This will tell you when sync materialization is happening. In the case of open source Apache Flink, it will say as an annotation on the sync, Upsert materialized is true. And in the case of the Confluent Cloud version of EXPLAIN, it will tell you that the sink is using a high amount of state. So that's it. Thanks for paying attention. And now head on over to Confluent Developer, where you'll find lots more resources for learning about Apache Flink and Apache Kafka.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.