Get Started Free
course: Apache Flink® SQL

Exercise: Stream enrichment

45 min
David Anderson

David Anderson

Software Practice Lead

Exercise: Stream enrichment

Enrichment and changelog modes

The module on JOINs presents temporal joins, and says that they are better suited for stream enrichment than regular inner joins.

The video on changelog processing further explains that it's best to stick to append-only processing whenever possible. Efficiency and flexibility are the main reasons for this.

Consider this example:

SELECT orders.*, customers.postcode
FROM examples.marketplace.orders
JOIN examples.marketplace.customers
FOR SYSTEM_TIME AS OF orders.`$rowtime`
ON orders.customer_id = customers.customer_id;

In this example, both the incoming orders and the enriched orders produced by this temporal join are append-only streams, which is great.

Exercise: enrich the output of MATCH_RECOGNIZE

Try taking one of the examples from the module on match recognize that uses a pattern on the examples.marketplace.orders table to identify customers, and use a temporal join to enrich the results of match recognize with the customers' email addresses.

The solution presented below assumes you've started with this example:

SELECT *
FROM examples.marketplace.orders  
MATCH_RECOGNIZE (
    PARTITION BY customer_id
    ORDER BY `$rowtime`
    MEASURES
        ARRAY_AGG(UP.price) AS up_prices
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (UP{3})
    DEFINE
        UP AS (COUNT(UP.price) = 1) OR 
              (UP.price > LAST(UP.price, 1))
);
Basic hint

To make this work, you'll need to arrange for the output of the match_recognize query to include a time attribute. There's an example of this in the module on match recognize.

Deeper hint if you are getting an error about mis-matched rowtime types

You may find yourself experiencing this error:

ERROR

Event-Time Temporal Table Join requires same rowtime type in left table and versioned table, but the rowtime types are TIMESTAMP(3) *ROWTIME* and TIMESTAMP_LTZ(3) *ROWTIME*.

By default, the MATCH_ROWTIME() function returns a time attribute having the TIMESTAMP(3) type. Since the time attribute of the customers table has the TIMESTAMP_LTZ(3) type, a temporal join between these tables will fail.

The way to fix this is to pass in a timestamp parameter of the desired type. E.g., this will work: MATCH_ROWTIME($rowtime).

Complete solution
WITH customers_up_and_up AS (
    SELECT *
    FROM examples.marketplace.orders  
    MATCH_RECOGNIZE (
        PARTITION BY customer_id
        ORDER BY `$rowtime`
        MEASURES
            SUM(UP.price) AS total_spent,
            MATCH_ROWTIME($rowtime) AS match_time
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (UP{3})
        DEFINE
            UP AS (COUNT(UP.price) = 1) OR 
                (UP.price > LAST(UP.price, 1))
    )
)
SELECT customers.customer_id, total_spent, email
FROM customers_up_and_up
JOIN examples.marketplace.customers
FOR SYSTEM_TIME AS OF customers_up_and_up.match_time
ON customers_up_and_up.customer_id = customers.customer_id;

Enriching with an append-only stream

Using a temporal join requires a versioned, updating table on the right side of the join. In the previous section, the customers table was playing this role.

However, there may be times when you want to do a kind of enrichment, but with different semantics. For example, you may want to bring together two append-only streams in such a way that at any point in time, the output of the desired stream blending operation contains the latest information from both streams.

The documentation for Confluent Cloud for Apache Flink includes a detailed example of how to combine append-only streams in this way.

The query presented there is quite complex; let's break it down.

The overall concept is to union the streams being combined, and then use an over aggregation to keep track of the most recent values of the information being tracked across the inputs.

We can begin by defining a view that unifies the schemas of the streams being combined:

-- This query combines order and click data, tracking the latest values
-- for each customer's interactions across both dataset
-- First, combine order data and clickstream data into a single structure
-- Note: Fields not present in one source are filled with NULL
CREATE VIEW combined_customer_data AS (
    -- Orders data extended with empty click-related fields
    SELECT
        customer_id,
        order_id,
        product_id,
        price,
        -- click-specific fields set to NULL for order records
        CAST(NULL AS STRING) AS url,
        CAST(NULL AS STRING) AS user_agent,
        CAST(NULL AS INT) AS view_time,
        $rowtime    
    FROM `examples`.`marketplace`.`orders`
    UNION ALL
    -- Click data extended with empty order-related fields
    SELECT
        -- user_id normalized to match customer_id
        user_id AS customer_id,
        -- order-specific fields set to NULL for click records
        CAST(NULL AS STRING) AS order_id,
        CAST(NULL AS STRING) AS product_id,
        CAST(NULL AS DOUBLE) AS price,
        url,
        user_agent,
        view_time,
        $rowtime
    FROM `examples`.`marketplace`.`clicks`
);

Now if you take a look at the contents of this combined stream

SELECT customer_id, price, view_time FROM combined_customer_data;

you will see that it contains some rows that originated with the orders table (these have prices and no view times), and other rows that come from the clicks table (which are the other way around: they are view times and no prices).

Now let's put the combined_customer_data view to work, and layer on top of it another view that keeps track of the latest information for each customer:

CREATE VIEW latest_customer_info AS (
    -- For each customer, maintain the latest value for each
    -- field using window functions over the combined dataset
    SELECT
        LAST_VALUE(customer_id) OVER w AS customer_id,
        LAST_VALUE(order_id) OVER w AS order_id,
        LAST_VALUE(product_id) OVER w AS product_id,
        LAST_VALUE(price) OVER w AS price,
        LAST_VALUE(url) OVER w AS url,
        LAST_VALUE(user_agent) OVER w AS user_agent,
        LAST_VALUE(view_time) OVER w AS view_time,
        -- Track the latest event timestamp
        MAX($rowtime) OVER w AS rowtime
    FROM combined_customer_data
    -- Define window for tracking latest values per customer
    WINDOW w AS (
        PARTITION BY customer_id
        ORDER BY $rowtime
        -- Consider all previous events up to the current one
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 
    )
);

Repeating the previous query against this new view

SELECT customer_id, price, view_time FROM latest_customer_info;

reveals that, over time, for more and more of the customers we have values from both the orders and clicks tables. And if you use the option to see the changelog information (by pressing 'm' in the Flink shell), you see that like a temporal join, this approach that uses UNION combined with an OVER aggregation also produces an append-only stream as its output.

The documentation includes a more in-depth discussion of the requirements, limitations, and merits of this approach.

Materialization

If you write out combined_customer_data to a Kafka topic, you can choose whether you want to materialize the full history of changes to this view as an append-only stream/topic, or continuously materialize the latest information as an upserting stream / compacted topic.

Here's what those two options look like:

CREATE TABLE latest_customer_info_appending AS (
    SELECT * FROM latest_customer_info
);

vs.

CREATE TABLE latest_customer_info_upserting (
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'changelog.mode' = 'upsert',
    'kafka.cleanup-policy' = 'compact'
) AS (
    SELECT * FROM latest_customer_info
);

Note: If you try this, you will create topics in Confluent Cloud to store this data, as well as continuously running SQL statements that are populating those topics. After you've completed your experiments, you'll want to clean up these resources.

Here's an example of how to do that in the CLI. You can use the Flink shell to drop these tables, which will delete their associated topics and schemas. Here's an example for one of the tables:

DROP TABLE latest_customer_info_appending;

To delete the statement that has been populating this topic, you'll need its name. When you used CREATE TABLE AS to create the the table, the Flink shell displayed the name of that statement. Or you can use SHOW JOBS; in the Flink shell to find this information.

Then using the Confuent CLI (not the Flink shell), you can delete the statement:

❯ confluent flink statement delete \ 
cli-2025-02-20-182442-87919c79-2efb-4ed0-8df9-688368a3a614

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.