Integration Architect (Presenter)
In this exercise, learn how to add lambda functions to queries.
Create a stream1
with exam scores as a MAP
field:
CREATE STREAM stream1 (
id INT, name VARCHAR,
exam_scores MAP<STRING, DOUBLE>
) WITH (
kafka_topic = 'topic1', partitions = 1,
value_format = 'json'
);
Click Run query.
Create a stream transformed
using the TRANSFORM
function:
CREATE STREAM transformed
AS SELECT id, name,
TRANSFORM(exam_scores,(k, v) => UCASE(k), (k, v) => (ROUND(v))) AS rounded_scores
FROM stream1 EMIT CHANGES;
Click Run query.
Insert some data into stream1
:
INSERT into stream1 values(1, 'Lisa', MAP('Nov':=93.53, 'Feb':=94.13, 'May':=96.83));
INSERT into stream1 values(1, 'Larry', MAP('Nov':=83.53, 'Feb':=84.82, 'May':=85.27));
INSERT into stream1 values(1, 'Melissa', MAP('Nov':=97.20, 'Feb':=96.47, 'May':=98.62));
INSERT into stream1 values(1, 'Chris', MAP('Nov':=92.78, 'Feb':=91.15, 'May':=93.91));
Click Run query.
Run a query on the transformed stream:
SELECT * FROM transformed EMIT CHANGES;
Click Run query, and then scroll down to see the results.
Create stream2
with a points
ARRAY
field:
CREATE STREAM stream2 (
name VARCHAR,
points ARRAY<INTEGER>
) WITH (
kafka_topic = 'topic2', partitions = 1,
value_format = 'json'
);
Click Run query.
Create a reduced
stream using the REDUCE
function.
CREATE STREAM reduced
AS SELECT name,
REDUCE(points,0,(s,x)=> (s+x)) AS total
FROM stream2 EMIT CHANGES;
Click Run query.
Insert some data into stream2
:
INSERT INTO stream2 VALUES('Misty', Array[7, 5, 8, 8, 6]);
INSERT INTO stream2 VALUES('Marty', Array[3, 5, 4, 6, 4]);
INSERT INTO stream2 VALUES('Mary', Array[9, 7, 8, 7, 8]);
INSERT INTO stream2 VALUES('Mickey', Array[8, 6, 8, 7, 5]);
Click Run query.
Run a query on the reduced stream:
SELECT * FROM reduced EMIT CHANGES;
Click Run query, and then scroll down to see the results.
Create a stream3
with a numbers
ARRAY
field:
CREATE STREAM stream3 (
id VARCHAR,
numbers ARRAY<INTEGER>
) WITH (
kafka_topic = 'topic3', partitions = 1,
value_format = 'json'
);
Click Run query.
Create a filtered
stream using the FILTER
function:
CREATE STREAM filtered
AS SELECT id,
FILTER(numbers,x => (x%2 = 0)) AS even_numbers
FROM stream3 EMIT CHANGES;
Click Run query.
Insert some data into stream3
:
INSERT INTO stream3 VALUES('Group1', ARRAY[1, 2, 3,4,5,6,7,8,9,10,11,12,13,14,15]);
Click Run query.
Create a push query to run on the filtered stream:
SELECT * FROM filtered EMIT CHANGES;
Click Run query and scroll down to see the results.
As this is the last exercise in the course, make sure to delete your ksqlDB application. To delete, click ksqlDB on the left-hand side menu, then select Delete under Actions. Enter your application's name, then click Continue.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.