Integration Architect (Presenter)
Principal Developer Advocate (Author)
This exercise will demonstrate how to merge two streams in ksqlDB.
First, create your orders_uk
stream:
CREATE STREAM orders_uk (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER,
address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>)
WITH (KAFKA_TOPIC='orders_uk', VALUE_FORMAT='json', PARTITIONS=1);
Click Run query.
Insert data into your orders_uk
stream:
INSERT INTO orders_uk VALUES (1620501334477, 65, 'item_7', 5,
STRUCT(street:='234 Thorpe Street', city:='York', state:='England'));
INSERT INTO orders_uk VALUES (1620502553626, 67, 'item_3', 2,
STRUCT(street:='2923 Alexandra Road', city:='Birmingham', state:='England'));
INSERT INTO orders_uk VALUES (1620503110659, 68, 'item_7', 7,
STRUCT(street:='536 Chancery Lane', city:='London', state:='England'));
Click Run query.
Next, create your orders_us
stream:
CREATE STREAM orders_us (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER,
address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>)
WITH (KAFKA_TOPIC='orders_us', VALUE_FORMAT='json', PARTITIONS=1);
Click Run query.
Insert data into your orders_us
stream:
INSERT INTO orders_us VALUES (1620501334477, 65, 'item_7', 5,
STRUCT(street:='6743 Lake Street', city:='Los Angeles', state:='California'));
INSERT INTO orders_us VALUES (1620502553626, 67, 'item_3', 2,
STRUCT(street:='2923 Maple Ave', city:='Mountain View', state:='California'));
INSERT INTO orders_us VALUES (1620503110659, 68, 'item_7', 7,
STRUCT(street:='1492 Wandering Way', city:='Berkley', state:='California'));
Click Run query.
First, run a query on your orders_uk
stream:
SELECT * FROM orders_uk EMIT CHANGES;
Click Run query.
Scroll down and expand a record. Notice that it's from the UK.
Change your query to use the orders_us
stream:
SELECT * FROM orders_us EMIT CHANGES;
Click Run query.
Scroll down to expand another record. Notice that it's from the US.
Merge the orders_us
and orders_uk
streams into a new stream, orders_combined
, using the following two queries
:
CREATE STREAM orders_combined AS
SELECT 'US' AS source, ordertime, orderid, itemid, orderunits, address
FROM orders_us;
Click Run query.
INSERT INTO orders_combined
SELECT 'UK' AS source, ordertime, orderid, itemid, orderunits, address
FROM orders_uk;
Click Run query.
Now move to the Flow tab. See how orders_uk
and orders_us
flow in to create the orders_combined
stream.
Click on the orders_combined
stream. On the right-hand side, look at the list of records. Notice that the stream contains records from the UK and from the US.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.