Get Started Free
‹ Back to courses
course: Apache Flink® SQL

Pattern Recognition and Complex Event Processing with MATCH_RECOGNIZE

19 min
David Anderson

David Anderson

Software Practice Lead

Pattern Recognition and Complex Event Processing with MATCH_RECOGNIZE

With MATCH_RECOGNIZE, you can use a regular expression, in combination with filters and temporal sorting, to describe how events should be grouped together. This is useful for a wide range of event-driven and analytical use cases.

Code

The video presents a few examples illustrating how MATCH_RECOGNIZE works, and how to use it. The code for those examples is presented here.

These examples rely on the orders table built into Confluent Cloud.

Customers with 2 orders for the same product within the past 60 days

SELECT 
    customer_id, product_id, order1_time, order2_time
FROM 
    examples.marketplace.orders
MATCH_RECOGNIZE (
    PARTITION BY customer_id, product_id
    ORDER BY `$rowtime`
    MEASURES
        O1.`$rowtime` AS order1_time,
        O2.`$rowtime` AS order2_time
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN (O1 O2) WITHIN INTERVAL '60' DAYS
    DEFINE
        O1 AS true,
        O2 AS true
);

Customers with larger and larger orders (exactly 3 orders going up)

SELECT *
FROM examples.marketplace.orders  
MATCH_RECOGNIZE (
    PARTITION BY customer_id
    ORDER BY `$rowtime`
    MEASURES
        ARRAY_AGG(UP.price) AS up_prices
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (UP{3})
    DEFINE
        UP AS (COUNT(UP.price) = 1) OR 
              (UP.price > LAST(UP.price, 1))
);

Customers with larger and larger orders (3 or more orders going up, then 1 down)

SELECT *
FROM examples.marketplace.orders 
MATCH_RECOGNIZE (
    PARTITION BY customer_id
    ORDER BY `$rowtime`
    MEASURES
        ARRAY_AGG(UP.price) AS up_prices,
        DOWN.price AS down_price
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (UP{3,} DOWN)
    DEFINE
        UP AS (COUNT(UP.price) = 1) OR 
              (UP.price > LAST(UP.price, 1)),
        DOWN AS DOWN.price < LAST(UP.price, 1)
);

Generating video events

The video also shows an extended example based on a video_events table. It was generated as shown here:

CREATE TABLE video_base_events (
    WATERMARK FOR event_time AS event_time
) AS (
    SELECT
        UUID() AS session_id,
        ARRAY['video_0', 'video_1', 'video_2', 'video_3', 'video_4',
              'video_5', 'video_6', 'video_7', 'video_8', 'video_9']
             [RAND_INTEGER(10)+1] AS video_id,
        RAND_INTEGER(20000) AS user_id,
        RAND() < 0.10 AS converted,
        10+RAND_INTEGER(290)+1 AS duration,
        RAND() < 0.33 AS buffered_1,
        RAND() < 0.16 AS buffered_2,
        RAND() < 0.10 AS buffered_3,
        $rowtime AS event_time
    FROM examples.marketplace.orders
);

CREATE TABLE video_events (
    `session_id` STRING,
    `event_type` STRING,
    `event_time` TIMESTAMP_LTZ(3),
    `video_id` STRING,
    `user_id` INT,
    WATERMARK FOR `event_time` AS `event_time` - INTERVAL '5' MINUTE
);

INSERT INTO video_events WITH
    start_events AS (
        SELECT
            session_id,
            'start' AS event_type,
            event_time,
            video_id,
            user_id
        FROM video_base_events WHERE converted = TRUE
    ),
    pause_events AS (
        SELECT
            session_id,
            'pause' AS event_type,
            TIMESTAMPADD(SECOND, duration, event_time)
              AS event_time,
            video_id,
            user_id
        FROM video_base_events WHERE converted = TRUE
    ),
    buffering_1_events AS (
        SELECT
            session_id,
            'buffering' AS event_type,
            TIMESTAMPADD(SECOND, RAND_INTEGER(duration), event_time)
              AS event_time,
            video_id,
            user_id
        FROM video_base_events
        WHERE converted = TRUE AND buffered_1 = TRUE
    ),
    buffering_2_events AS (
        SELECT
            session_id,
            'buffering' AS event_type,
            TIMESTAMPADD(SECOND, RAND_INTEGER(duration), event_time)
              AS event_time,
            video_id,
            user_id
        FROM video_base_events
        WHERE converted = TRUE AND buffered_2 = TRUE
    ),
    buffering_3_events AS (
        SELECT
            session_id,
            'buffering' AS event_type,
            TIMESTAMPADD(SECOND, RAND_INTEGER(duration), event_time)
              AS event_time,
            video_id,
            user_id
        FROM video_base_events
        WHERE converted = TRUE AND buffered_3 = TRUE
    )
SELECT * FROM
    (SELECT * FROM start_events) UNION ALL
    (SELECT * FROM pause_events) UNION ALL
    (SELECT * FROM buffering_1_events) UNION ALL
    (SELECT * FROM buffering_2_events) UNION ALL
    (SELECT * FROM buffering_3_events);

Parsing the stream of video events into sessions

SELECT session_id, bufferings, s_time, duration
FROM video_events
MATCH_RECOGNIZE (
    PARTITION BY session_id
    ORDER BY event_time
    MEASURES
        COUNT(B.event_type) AS bufferings,
        TIMESTAMPDIFF(SECOND, S.event_time, P.event_time)
          AS duration,
        MATCH_ROWTIME() AS s_time
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (S B* P)
    DEFINE
        S AS S.event_type = 'start',
        B AS B.event_type = 'buffering',
        P AS P.event_type = 'pause'
);

Putting the videos sessions into tumbling windows

This example uses a CTE (Common Table Expression) named bufferings_per_session. If you're not familiar with CTEs, you can learn more about them in the documentation.

WITH bufferings_per_session AS (
    SELECT bufferings, s_time
    FROM video_events
    MATCH_RECOGNIZE (
        PARTITION BY session_id
        ORDER BY event_time
        MEASURES
            COUNT(B.event_type) AS bufferings,
            MATCH_ROWTIME() AS s_time
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (S B* P)
        DEFINE
            S AS S.event_type = 'start',
            B AS B.event_type = 'buffering',
            P AS P.event_type = 'pause'
    )
)
SELECT window_end, AVG(CAST(bufferings AS FLOAT))
                   AS avg_per_session
FROM TABLE(
    TUMBLE(
        DATA => TABLE bufferings_per_session,
        TIMECOL => DESCRIPTOR(s_time),
        SIZE => INTERVAL '1' MINUTE))
GROUP BY window_start, window_end;

Using ARRAY_AGG with ROW and CAST for debugging

This example illustrates the use of ARRAY_AGG together with ROW and CAST to collect information to help with debugging:

SELECT *
FROM examples.marketplace.orders  
MATCH_RECOGNIZE (
    PARTITION BY customer_id
    ORDER BY `$rowtime`
    MEASURES
        ARRAY_AGG(
            CAST(ROW(UP.order_id, UP.price) AS 
                 ROW<order_id STRING, price DECIMAL(10,2)>)
        ) AS details    
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (UP{3})
    DEFINE
        UP AS (COUNT(UP.price) = 1) OR 
              (UP.price > LAST(UP.price, 1))
);

Documentation

We've got promo codes for you! Use CONFLUENTDEV1 to skip credit card entry during signup, and FLINKSQL25 for $25 of additional usage credit.

Pattern Recognition and Complex Event Processing with MATCH_RECOGNIZE

Match recognize is a feature of Flink SQL that allows you to describe patterns that are matched against a stream of incoming events. When I first heard about this feature, I thought it sounded rather obscure, but now that I understand it, I find lots of uses for it. Knowing how to use match recognize has been a bit like having a secret superpower!

Match recognize has been part of the SQL standard since 2016, but before I started working with Flink SQL I had never heard of it.

I imagine this is because very few databases have implemented match_recognize. On the other hand, pattern matching, or complex event processing, is a good fit for many stream processing use cases, and given the popularity of SQL for stream processing, match_recognize is becoming a widely used tool in the data streaming engineer's toolbox.

Apache Flink supports most, but not all of the match_recognize standard. And Flink goes beyond the standard in one area, which I'll illustrate with an example.

This video is organized around presenting some examples of how to use match recognize, first for building event-driven applications, and then for computing real-time analytics. I'll close by sharing some hard-earned best practice tips for working with and troubleshooting queries that use match recognize.

The first couple of examples I want to show you involve detecting patterns in this Orders table.

In these examples we'll be using the customer_id, product_id, price, and rowtime fields, which tell us which customer placed the order, what product they purchased, its price, and the time the order was placed.

We're going to start with a relatively straightforward example. The idea is that we want to identify customers who have just placed a second order for a product they have already purchased within the past 60 days.

The business requirement might be that we want to invite them to create a subscription that will order this product on a regular basis -- but let's start by learning to identify the customers who exhibit this pattern.

The overall approach is that we are going to be applying match_recognize to an input table -- in this case, our orders table --

and match_recognize is going to transform the input into a table of results.

Each time the pattern is matched, one row will be appended to the output table.

MATCH_RECOGNIZE works from a description of this transformation, and it consists of two steps: first, a description of what the pattern looks like and how it should be applied to the table,

and secondly, what the output should look like.

In this case, I want output that shows which customer has placed two orders for the same product, what that product is, and when those orders were placed.

The first step in describing the pattern is to determine if, and how, the input should be partitioned.

Partitioning the input table is the same as grouping the table rows by one or more of their fields.

For example, we could partition the orders table by the customer_id. Since this table has orders for two customers, this partitioning yields two partitions, each with two orders.

The patterns you define with MATCH_RECOGNIZE are matched independently in each partition. This means that it's best to partition the table as finely as you can, because each partition will then have fewer irrelevant records that your pattern will have to know how to skip over.

For this example, partitioning the table by both customer and product is ideal, because each partition then has exactly what we need -- just the records for one customer and a specific product.

Now we can start to look at the syntax used by MATCH_RECOGNIZE.

Here's the partitioning we talked about.

And here are the basics of the pattern itself. Patterns are described as regular expressions. Here the pattern is a sequence of two consecutive orders, called O1 and O2.

In general, the WITHIN clause is optional. In this example, it specifies that the entire pattern must be matched by a sequence of events no more than 60 days long.

This time constraint is something the Flink community has added to match_recognize, and is not part of the SQL standard.

The variables used in the regular expression are defined below the pattern spec in a section labeled DEFINE. This section is where you can impose constraints on what these variables can match.

In this example, I'm leaving both O1 and

O2 unconstrained -- they can match anything. Because we have partitioned the input stream by both customer_id and product_id, we know these two orders are already constrained to be for the same customer, and the same product.

For the pattern matching to behave correctly, and deterministically, MATCH_RECOGNIZE always sorts its input stream.

Flink only supports match_recognize in streaming mode, and can only sort the input stream by the time attribute of the input table. In this case, we are using the $rowtime field provided by Confluent Cloud.

Why does match recognize need to sort its input? Well, the patterns being matched describe sequences of events, arranged in time. Since event streams often have their events arrive out of order, we need to somehow guarantee that match recognize will apply its pattern matching logic to the events in the order in which they originally occurred.

The final piece in setting up the input side of match recognize is to define where in the input table the pattern matching engine should begin looking for the next match.

In this case, we want to position the pattern matcher at the order immediately after O1. This means that O2 will be considered as the first order in the next match.

You can make other choices for AFTER MATCH SKIP.

For example, if I use SKIP PAST LAST ROW instead, match recognize will skip ahead, and wait for a row to arrive after O2.

SKIP TO FIRST and SKIP TO LAST allow for more fine-grained positioning within the matched pattern. In my experience, these options are only rarely used.

Now it's time to shift gears, and look at how MATCH_RECOGNIZE allows us to describe the output it will produce.

For starters, every row in the output will contain the columns used for partitioning. In this example, that's the customer_id and product_id.

Each output row will also contain the columns described in the MEASURES clause.

Notice how the variables O1 and O2 that were used to define the pattern are available for us to use here in the MEASURES clause.

In this case, I'm using the variables O1 and O2 to define the columns named order1_time and order2_time. Those contain the timestamps of the two matching orders.

We've now fully described both the input and output sides of this pattern, and here's what the complete solution looks like.

The MATCH_RECOGNIZE block in this query holds the pattern definition we've been developing.

MATCH_RECOGNIZE is being applied to the orders table.

And we are selecting specific columns from the table produced by MATCH_RECOGNIZE.

I can hear some of you saying that this feels a bit overblown, and you'd rather use an over window, or a self join, instead.

Those are reasonable alternatives. For comparison, here's what those solutions would look like.

For many simple patterns, you will often be able to find other solutions, and they may be perfectly viable. Knowing how to use match recognize becomes much more of a superpower as the patterns become more complex.

In this case, both of the queries shown here will optimize their state retention. They won't keep any orders more than 60 days old, and the same is true for the solution based on match recognize.

For this use case, the only significant problem I can see is with the JOIN, which can't easily be made to have a time attribute in its output stream. This prevents the join from being used as input to further time-based operations, unless you first materialize its output to Kafka and set up the timestamps and watermarks yourself.

I want to show you one more thing before moving on to another example.

This pattern I've developed is based on partitioning the table by both customer and product.

But what if I had left out the product dimension, and only partitioned the input by customer?

The effect would be that each partition would include all of the orders for that customer. To accommodate this, the pattern would need to be more complex, so that it could skip over any orders for other products that might occur between O1 and O2. This idea is captured in the revised pattern by the variable OTHER, where each instance of OTHER is an order for any product other than the product purchased in O1.

In this pattern, OTHER* is similar to .* in a regular expression.

In this next example, we are going to look for customers whose orders keep increasing in price, perhaps with the goal in mind of offering them some sort of reward, like an offer for free shipping.

Just like the first example, this pattern is also being applied to the Orders table, but this time it is partitioned by the customer_id. Once again, it is ordered by the system-provided rowtime column.

This is the pattern -- it's a sequence of 3 consecutive orders, ...

... where the price has gone up each time.

The heart of this condition is expressed here, using the LAST function. Here we are comparing, UP.price, which is the price of the incoming row, to the price of the previously matched order.

If there was no previous order, this price comparison would be false.

In general, the LAST function lets you look backwards through the stack of recently matched records, and access the values of their fields. In this case, we are just looking back one record, and checking the price of the last order.

There is also a FIRST function. It is very similar, except that that starts counting from the first mapped record, rather than the last.

LAST is not well defined when the sequence of matched records is empty, so I've used the highlighted condition to bootstrap the pattern matching. The count is equal to one when the first record is being added to the variable UP. Once the count is no longer one, or in other words, once the UP variable is no longer empty, subsequent matching orders will need to have higher and higher prices.

For this example, I've chosen to produce as output an array containing the prices of the 3 matching orders.

I find this array aggregation function really useful.

I find it especially helpful for debugging and verifying the behavior of my queries.

You may be wondering why I had this pattern look for exactly 3 orders with increasingly higher prices. Why not find cases with 3 or more orders with ascending prices?

This is how you would express this -- with a comma after the 3, this pattern will match cases not just with 3, but with 3 or more orders satisfying the conditions.

However, if you try this, you will get the error shown here.

This fails because Flink's implementation of match recognize is designed to produce a final result once the pattern is fully matched. If asked to find 3 or more orders satisfying the condition of ever increasing prices, then it always make sense for the pattern matching algorithm to wait for more orders that match the pattern. If the input were bounded, then it could wait until having processed all of the input and then produce a final result. But the input is an unbounded stream, so a pattern like this can never be fully matched.

One way to fix this is to allow the pattern to match 3 or more orders with increasing prices, followed by an order where the price goes down. Here DOWN is defined as an order whose price is less than the price of the last order in the group of orders matching the UP condition.

These examples illustrate use cases of the kind of complex event processing that match recognize makes possible. Other use cases for complex event processing include fraud and anomaly detection, and business process monitoring. In general, these use cases will trigger some sort of reaction when the stream of incoming events matches some pre-defined pattern.

A complete solution could be built along these lines. Having found a match to one of these patterns, we can then combine the information derived from the pattern with additional contextual information, such as the customer's email address, and write that out to a kafka topic that serves as a input for a microservice. That service can then send an offer to the customer.

There's another category of applications you can build with match recognize, and that involves using pattern matching for computing real-time analytics.

In a previous video I talked about using window functions with Flink SQL to compute time-based analytics. That included session-based analytics, where sessions were defined by periods of inactivity between sessions.

In this next example, I want to show you how to use match recognize to compute session-based analytics, where the sessions are not defined by event timing, but are instead demarcated by specific events. Many real world event streams include pairs of events, such as connect and disconnect, or login and logout, that mark the beginning and ending of sessions.

This example is based on a simulation of a video-on-demand streaming service.

This is a very simple simulation, but should be enough to get across the main concepts without getting us bogged down in the details of an imaginary business.

Here are some events from one of the simulated users. We can make better sense of these events if we sort them by time, just as match recognize will do.

Each event in this table belongs to a user, and has a sessionId. Each session begins with a start event, and ends with a pause event.

In addition to start and pause events, there is one other event type, which is a buffering event. These buffering events indicates that the user experienced a hiccup in video playback while the playback device waited for the stream to be rebuffered.

This table shows two sessions, the first of which had no buffering events, while the second session had two buffering events. Of course, occasional re-buffering is normal, and can be caused by issues in the users' networks. But widespread service degradation is something the video-on-demand service operator will want to investigate.

This match recognize pattern captures how this stream works:

each session has a unique session id that we will use to partition the stream the events should be ordered according to the event_time column and each session begins with a start event, ends with a pause event, and may have any number of buffering events in between

The measures section is where we can capture the session-based analytics we are interested in.

The buffering events that match the variable B are a group we can aggregate over. Here I am counting the number of buffering events in each session.

And here I am measuring the duration of each session by computing the difference between the timestamps of the start and pause events, as measured in seconds.

MATCH ROWTIME is a special built-in function that will return a time attribute for each row in the output. The timestamp of this time attribute will be the timestamp of the last event in the match -- or in other words, the timestamp of the pause event that ends the match. But match rowtime is more than a timestamp -- it is a time attribute, which means that it also has watermarking defined on it. We can use this time attribute to apply additional temporal operations to the output of match recognize, such as windowing. I'll show an example of this in just a moment, but first I want to show you want the output table looks like.

Each row of the output corresponds to one instance of the pattern matching a session. The output is reporting the number of buffering events, the duration of the session, and there's time attribute called s_time.

Now this rather complicated looking query is using the match recognize logic we've just been looking at to count the number of buffering events in each session, and then this query puts those results into tumbling windows that compute the average number of buffering events in the sessions that ended during each window.

Most of this the match_recognize query from before.

Here I've used a common table expression to give that match recognize query a name, which is used as the table being fed into the tumbling windows.

And here you can see that the time attribute produced by match_recognize is being used as the time attribute for the tumbling window function.

Now I want to share some things to keep in mind when you're working with match_recognize.

First of all, the input to MATCH_RECOGNIZE must be an append-only table. Using MATCH_RECOGNIZE with an updating table, such as the output of a regular aggregation or a regular join, isn't supported. If you try, it will result in an error.

The output from MATCH_RECOGNIZE is itself an append-only table. This is convenient, as it means that there are no limitations on how you might further process the results produced by MATCH_RECOGNIZE. For simple patterns, you may be tempted to use a self-join and/or an OVER aggregation instead. The fact that match_recognize can produce time attributes is a good reason to prefer this approach over a self-join.

And finally, because MATCH_RECOGNIZE does sorting, it adds latency to your stream processing pipeline. How much latency it adds will depend on how much out-of-order-ness your watermarking strategy can accommodate. It's also the case that late events will be ignored.

As for best practices for working with match_recognize, you need to keep in mind that MATCH_RECOGNIZE keeps state. The Flink SQL runtime has to keep track of every partially matched pattern. Consider, for example, the pattern we looked at first -- the one that looks for customers who have placed two orders for the same product within 60 days. In order to do this, the runtime has to remember every unmatched order from the past 60 days. That's may be a lot of state, but without this time constraint, it would be much worse.

This is why it's a best practice to guarantee that every pattern will eventually end. In the first example we did this by constraining the matches to be found within 60 days. In the third example, the one with video playback events, we are confident that every session will end with a pause event, so we don't expect to have any permanently lingering, partially matched sessions.

It is possible for MATCH_RECOGNIZE to behave non-deterministically. This can happen if your input stream has events with the same timestamp. This will make the sorting non-deterministic unless you provide a secondary sort ordering in the ORDER BY clause.

Some day you may find yourself debugging a MATCH_RECOGNIZE query, which probably means that the query is failing to find any matches. Here are a few ideas that have helped me:

Remember that the sorting MATCH_RECOGNIZE does relies on watermarks. So you'll want to check that watermarks are being produced, and that they are advancing. For more on debugging problems with watermarks, see the resources linked in the description below.

Could it be that your pattern can only be satisfied by an event that hasn't arrived? Double check that your pattern is guaranteed to terminate.

My experience is that's surprisingly easy for patterns to behave in subtly different ways than I expected. What's helped me most in debugging these situations is to simplify the pattern to the point where it does produce output, and to include add'l columns in that output that I can use to check my assumptions about what is being matched by the different variables defined in the pattern.

When your pattern includes a variable that might match many rows from the input, the array aggregate function is a convenient way to capture information from those rows for debugging.

Earlier, in the example with prices that go up and up, I used the array aggregation function to verify that the prices of the matched orders are indeed rising.

In practice, I would probably do something more like this. Here each element in the array is actually an embedded row, containing various details of the matching order.

The built-in ROW function constructs a new row, and here I've used a cast so that I can name the fields in these embedded rows.

Thanks for watching!

If you want to experiment on your own, don't worry: you don't have to try to scrape the text for these examples out of this video. The code for these examples, and other related content, is on the Confluent Developer website. You'll find a link in the description below.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.