Integration Architect (Presenter)
Principal Developer Advocate (Author)
This exercise will demonstrate how to merge two streams in ksqlDB.
First, create your orders_uk stream:
CREATE STREAM orders_uk (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER,
address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>)
WITH (KAFKA_TOPIC='orders_uk', VALUE_FORMAT='json', PARTITIONS=1);
Click Run query.
Insert data into your orders_uk stream:
INSERT INTO orders_uk VALUES (1620501334477, 65, 'item_7', 5,
STRUCT(street:='234 Thorpe Street', city:='York', state:='England'));
INSERT INTO orders_uk VALUES (1620502553626, 67, 'item_3', 2,
STRUCT(street:='2923 Alexandra Road', city:='Birmingham', state:='England'));
INSERT INTO orders_uk VALUES (1620503110659, 68, 'item_7', 7,
STRUCT(street:='536 Chancery Lane', city:='London', state:='England'));
Click Run query.
Next, create your orders_us stream:
CREATE STREAM orders_us (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER,
address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>)
WITH (KAFKA_TOPIC='orders_us', VALUE_FORMAT='json', PARTITIONS=1);
Click Run query.
Insert data into your orders_us stream:
INSERT INTO orders_us VALUES (1620501334477, 65, 'item_7', 5,
STRUCT(street:='6743 Lake Street', city:='Los Angeles', state:='California'));
INSERT INTO orders_us VALUES (1620502553626, 67, 'item_3', 2,
STRUCT(street:='2923 Maple Ave', city:='Mountain View', state:='California'));
INSERT INTO orders_us VALUES (1620503110659, 68, 'item_7', 7,
STRUCT(street:='1492 Wandering Way', city:='Berkley', state:='California'));
Click Run query.
First, run a query on your orders_uk stream:
SELECT * FROM orders_uk EMIT CHANGES;
Click Run query.
Scroll down and expand a record. Notice that it's from the UK.
Change your query to use the orders_us stream:
SELECT * FROM orders_us EMIT CHANGES;
Click Run query.
Scroll down to expand another record. Notice that it's from the US.
Merge the orders_us and orders_uk streams into a new stream, orders_combined, using the following two queries:
CREATE STREAM orders_combined AS
SELECT 'US' AS source, ordertime, orderid, itemid, orderunits, address
FROM orders_us;
Click Run query.
INSERT INTO orders_combined
SELECT 'UK' AS source, ordertime, orderid, itemid, orderunits, address
FROM orders_uk;
Click Run query.
Now move to the Flow tab. See how orders_uk and orders_us flow in to create the orders_combined stream.
Click on the orders_combined stream. On the right-hand side, look at the list of records. Notice that the stream contains records from the UK and from the US.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.