Integration Architect (Presenter)
Learn about ksqlDB's materialized views in this exercise.
Create a stream MOVEMENTS
:
If you have a stream named MOVEMENTS
and the table called PERSON_STATS
from a previous lesson you can drop them with:
DROP TABLE PERSON_STATS DELETE TOPIC;
DROP STREAM MOVEMENTS DELETE TOPIC;
Now create the new stream:
CREATE STREAM MOVEMENTS(PERSON VARCHAR KEY, LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');
Click Run query.
Insert data into MOVEMENTS
:
INSERT INTO MOVEMENTS VALUES ('Robin', 'York');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');
Click Run query.
To see the number of movements per person in the stream, enter the following query:
SELECT PERSON, COUNT (*)
FROM MOVEMENTS GROUP BY PERSON EMIT CHANGES;
Click Run query.
Edit the query to count the unique locations that each person has visited:
SELECT PERSON, COUNT_DISTINCT(LOCATION)
FROM MOVEMENTS GROUP BY PERSON EMIT CHANGES;
Click Run query.
Now create a table that shows both aggregated calculations, combining PERSON
, COUNT(*)
and COUNT_DISTINCT(LOCATION)
:
CREATE TABLE PERSON_STATS AS
SELECT PERSON,
LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION,
COUNT(*) AS LOCATION_CHANGES,
COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
FROM MOVEMENTS
GROUP BY PERSON
EMIT CHANGES;
Click Run query.
Insert more data into your MOVEMENTS
stream to make the calculation more interesting:
INSERT INTO MOVEMENTS VALUES('Robin', 'Manchester');
INSERT INTO MOVEMENTS VALUES('Allison', 'Loveland');
INSERT INTO MOVEMENTS VALUES('Robin', 'London');
INSERT INTO MOVEMENTS VALUES('Allison', 'Aspen');
INSERT INTO MOVEMENTS VALUES('Robin', 'Ilkley');
INSERT INTO MOVEMENTS VALUES('Allison', 'Vail');
INSERT INTO MOVEMENTS VALUES('Robin', 'York');
Click Run query.
SELECT
a person’s name to see how many times they have changed locations and how many unique locations they have visited:
SELECT * FROM PERSON_STATS WHERE PERSON = 'Allison';
Click Run query.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.