Get Started Free
‹ Back to courses
course: Apache Flink® SQL

Changelog processing and troubleshooting errors about change updates

10 min
David Anderson

David Anderson

Software Practice Lead

Changelog processing and troubleshooting errors about change updates

Changelog processing is a core aspect of both the planning and execution phases of the Flink SQL runtime.

This is an important concept, because the changelog modes of your sources and sinks need to correspond to what your queries will accept and produce.

It's also the case that some queries you might want to use aren't (currently) allowed, due to restrictions in how various SQL operations can be combined together, based on whether they can process update streams.

This video explains how changelog processing is handled by Flink SQL, its impact on your queries, how to handle the errors you might see, and some best practices.

Topics:

  • Stream-table duality
  • Changelog modes
  • Append-only vs. updating streams
  • Understanding errors related to changelog modes
  • Advantages of using CREATE TABLE AS
  • Best practices

Resources

We've got promo codes for you! Use CONFLUENTDEV1 to skip credit card entry during signup, and FLINKSQL25 for $25 of additional usage credit.

Changelog processing and troubleshooting errors about change updates

PRESENTER NOTES FOR: “15-changelog”

In an earlier video, I made the case for thinking of Flink SQL as a query processor.

In this video, I'm going to dive a bit deeper into the planning and execution phases of query processing in Flink SQL, and explain how they are organized around changelog processing.

Why is this interesting? Well, it turns out that seemingly subtle changes to a query can have significant impacts on the query plan, which in turn can have a large impact on the resource requirements -- and ultimately the cost -- of its execution.

And sometimes the planner won't be able to handle what you've asked it do.

This rather innocent looking query is trying to match a pattern against the customers table, and the planner is saying that can't be done.

The error message says "Match Recognize doesn't support consuming the update and delete changes produced by ChangelogNormalize", whatever that means.

As you experiment with Flink SQL, you may encounter errors like this one. In this particular case, this match_recognize query isn't inherently impossible to execute. Instead, this error is simply the result of a missing feature -- Flink SQL is, after all, an open source project, and the vision it embodies has yet to be fully realized.

On the other hand, the planner is also throwing the same exception while trying to plan this straightforward grouped aggregation.

In this case, the error is coming from the equivalent of trying to fit a square peg into a round hole -- I've set up a situation that doesn't quite make sense.

To understand these errors, we need to step back and examine what it means that internally, Flink SQL is always processing changelog streams.

Along the way I'll share some best practices that will help keep you out of trouble.

Whenever you use CREATE TABLE to describe a source for Flink SQL, that source is a backed by a stream of records that describe changes to a table. That stream of changes is a changelog stream. If you are familiar with change data capture streams, or database write-ahead logs, this is the same concept.

We often use the term stream-table duality to talk about what's going on here. We can convert a changelog stream into a table by applying the changes in the stream to some storage where we want to hold a materialized table -- perhaps in a database, key/value store, or data lake.

Changelog streams come in a few different flavors..

The simplest changelog streams contain nothing but insert events, such as this append-only stream of orders.

As new messages arrive from the orders stream, they describe new orders that should be appended to the orders table.

It's important to note that the orders in this table are immutable; they can't be changed in any way.

By contrast, the records in this customers table will change over time. For example, this customer has moved, and is about to update their postcode.

There are two different ways that streams with updates can be handled by Flink SQL. One of these is with a retracting stream, as shown here. Retracting streams represent updates as a pair of events in the changelog stream, shown here as -U followed by +U.

The -U retracts the old record

and the +U inserts an updated record to take its place.

There's another type of updating stream, which is an upserting stream. Upserting streams always have a primary key, which is used to uniquely identify the row being updated.

If the table doesn't yet have a row for that key, then an upsert acts as an insert;

otherwise, an upsert modifies the existing row, in place. That's about to happen here, as this customer updates their postcode.

In total, there are 4 different types of changes that can appear in a changelog stream.

Insert is the default mode used by source tables. The short name for this mode is +I.

Update before, or -U, represents the retraction of a previously emitted result. This is only used by retracting streams.

+U is known as update-after. In a retracting stream, this event supplies the updated information for the record that has just been retracted. In upsert mode, a +U inserts or updates the record identified by the upsert key

And, finally, there's also a way to represent deletions.

This table summarizes what you've just seen.

Flink SQL has two categories of changelog streams -- and two corresponding categories of tables -- namely append-only, and updating.

Sometimes the term insert-only is used instead of append-only, but this is the same thing.

When it comes to updating streams, they come in two flavors: retracting and upserting.

Retracting streams use the full set of changelog modes, and express updates as a combination of -U and +U. This retracting changelog mode is necessary in cases where this is no primary key. And it turns out that retracting streams are easier for Flink to handle, due to the distributed nature of Flink's runtime. As a matter of fact, Flink SQL always converts upsert sources into retract streams.

On the other hand, upserting streams don't use retractions, or inserts. These streams leverage the table's primary key, which we'll call the upsert key.

By the way, these concepts aren't new. In the world of change data capture, the terms full and partial image are used to describe these two different ways to encode updates.

Now let's talk about how the Flink SQL planner knows which changelog mode is being used at each stage of the stream processing pipeline.

The sources you create declare what types of changes they will produce. In some cases, this is declared explicitly, while other times this happens implicitly. With the confluent connector, append mode is the default, but you can explicitly set the mode to append, upsert, or retract.

The open source kafka connector always operates in append mode, and the open source kafka-upsert connector always produces upsert streams.

The changelog streams produced by the sources are processed pipelines composed of a series of operators. Many of the SQL operators are limited in the types of changes they can accept and produce.

But the planner / optimizer has enough information available to it to be able to track the changelog modes and primary keys throughout the pipeline.

Finally, the sinks at the end of the pipeline declare the types of changes they can handle.

Now we have established enough shared vocabulary that I can explain what's happening with the errors I showed at the beginning.

The first half of the explanation is this: some SQL operators always produce updating streams. This is an inherent truth about these operators -- it's in the nature of what they do. This includes sources that include updates and deletions, such as a CDC stream in debezium format coming from a relational database. And it also includes operations like grouped aggregations, and some JOINs.

Now for the second half of the explanation.

We just saw that some SQL operators insist on producing updates. Unfortunately, some SQL operations can't handle updates in their input. This means that the operations on the left can not be composed with the operations on the right.

In the case of append-only sinks, it makes sense that they can't accept updating streams. For the rest of these operations, the fact that they can't handle updates is simply a limitation of the current implementation. Like any successful open source project, Apache Flink is gradually improving. Over time, I expect that the Flink community will extend at least some of these operators so they can handle changelogs with updates.

This, then, is why the Flink SQL planner currently can't handle this particular match recognize query: the input table is upsert table, and match_recognize doesn't accept update streams.

And what is this changelog normalize that was mentioned in the error message?

Changelog normalization is the process of transforming an upsert stream to a retract stream. This is something that the Flink SQL runtime does internally -- whenever it ingests an upserting stream, it transforms it to a retracting stream. This is an implementation detail that has found its way into an error message; it's not particularly important.

Now for this other example.

This aggregation couldn't be written to the product_counts table. This is happening because the product_counts table was created without mention of its changelog mode, so it defaulted to being an append-only table. It can't handle the updating counters produced by this grouped aggregation.

How can we fix this? The table being used to collect the output should have a primary key, and it should use upsert mode. The easiest way to make this happen is to take advantage of CREATE TABLE AS. When you create a table with CREATE TABLE AS, it leverages the planner to choose the best possible settings for the primary key and the changelog mode. By the way, this is a situation where you could choose to use a compacted topic as the underlying storage for this table -- that's a choice you can make, depending on whether you want to retain the full history of changes, or allow them to be compacted away.

So far we've seen that the Flink SQL runtime uses 3 different types of changelog streams, and that some operators force transitions between these changelog modes.

For some queries, the sources, sinks, and everything in between operate entirely in append mode.

This is ideal. Queries that never leave append mode can be executed very efficiently.

Specially what I'm talking about here are any of the time-based operations, which includes windows, over aggregations, temporal and interval joins, and match recognize.

However, as we saw a couple of minutes ago, some queries need to do processing that involves updates.

Earlier we saw that the world of updating streams includes both upserting and retracting streams. These operations tend to be more resource intensive.

Conveniently enough, Flink is able to convert between these two types of updating streams.

However, once you enter this regime of updating streams, it's currently impossible to go back to the more efficient processing that's only possible with append-only streams. You'll notice that the arrow from append-only down to updating only goes in one direction.

However, development is underway to remove this restriction. If this interests you, follow the work being done on FLIP-440, which is about a new type of user-defined function called process table functions. This will be useful for lots of things, including the conversion of updating streams to append-only streams.

To sum things up, here's one more best practice tip, which is about avoiding operations that produce updating streams.

Whenever possible, try to stick to append-only processing. You can do this by limiting yourself to time-based operations.

The reason to prefer these operations is that they are able to more efficiently manage the state they need, thanks to the time-bounded nature of these operations.

Thanks for watching!

This is just one in a whole series of videos about Flink SQL. And you'll find more resources for learning about Flink and Flink SQL over on the Confluent Developer website. There's a link in the description below.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.