Integration Architect (Presenter)
ksqlDB can write data from multiple streams into a single target. This can be useful when you have events for the same logical entity (for example, orders) written to separate topics (perhaps originating in different Apache Kafka clusters or instances of the producing application). At the same time, you can add in or modify the data to ensure that attributes such as identifiers remain unique.
Taking the first stream as the source, create a target stream, including a hard-coded identifier for the source and a concatenation to ensure that the ID remains unique:
CREATE STREAM ORDERS_COMBINED AS
SELECT 'US' AS SOURCE,
CONCAT_WS('-','US',CAST(ORDERID AS VARCHAR)) AS ORDERID,
ORDERTIME,
ITEMID,
ORDERUNITS,
ADDRESS
FROM ORDERS
PARTITION BY CONCAT_WS('-','US',CAST(ORDERID AS VARCHAR));
Now add in additional sources to the same target, using INSERT INTO. This works in the same way as CREATE STREAM … AS SELECT, by reading the output of a continuous SELECT query. The only difference is that an INSERT INTO statement writes to an existing target.
INSERT INTO ORDERS_COMBINED
SELECT 'UK' AS SOURCE,
CONCAT_WS('-','UK',CAST(ORDERID AS VARCHAR)) AS ORDERID,
ORDERTIME,
ITEMID,
ORDERUNITS,
ADDRESS
FROM ORDERS_UK
PARTITION BY CONCAT_WS('-','UK',CAST(ORDERID AS VARCHAR));
Hi, I'm Allison Walther with Confluent. Let's cover how to merge two streams in ksqlDB. Sometimes we have separate event streams with data that logically goes together, ksqlDB makes this easy to do. Using insert into, we can add events from multiple streams into one combined stream. As new events arrive in either source stream, the combined stream will have them too. That's really all there is to merging. The only thing left is to merge this lesson into an exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.