Course: ksqlDB 101

Hands On: Apply Lambda Functions to Arrays and Maps

2 min
Allison WaltherIntegration Architect (Course Presenter)

Hands On: Apply Lambda Functions to Arrays and Maps

In this exercise, learn how to add lambda functions to queries.

Transform Function

Create a Stream with a MAP

  1. Create a stream1 with exam scores as a MAP field:

    CREATE STREAM stream1 (
        id INT, name VARCHAR,
        exam_scores MAP<STRING, DOUBLE>
    ) WITH (
        kafka_topic = 'topic1', partitions = 1,
        value_format = 'json'
    ); 

    Click Run query.

Create a Stream Using Transform

  1. Create a stream transformed using the TRANSFORM function:

    CREATE STREAM transformed 
        AS SELECT id, name,
        TRANSFORM(exam_scores,(k, v) => UCASE(k), (k, v) => (ROUND(v))) AS rounded_scores
    FROM stream1 EMIT CHANGES;

    Click Run query.

Insert Data and Query

  1. Insert some data into stream1:

    INSERT into stream1 values(1, 'Lisa', MAP('Nov':=93.53, 'Feb':=94.13, 'May':=96.83));
    INSERT into stream1 values(1, 'Larry', MAP('Nov':=83.53, 'Feb':=84.82, 'May':=85.27));
    INSERT into stream1 values(1, 'Melissa', MAP('Nov':=97.20, 'Feb':=96.47, 'May':=98.62));
    INSERT into stream1 values(1, 'Chris', MAP('Nov':=92.78, 'Feb':=91.15, 'May':=93.91));

    Click Run query.

  2. Run a query on the transformed stream:

    SELECT * FROM transformed EMIT CHANGES;

    Click Run query, and then scroll down to see the results.

Reduce Function

Create a Stream with an ARRAY

  1. Create stream2 with a points ARRAY field:

    CREATE STREAM stream2 (
        name VARCHAR,
        points ARRAY<INTEGER>
    ) WITH (
        kafka_topic = 'topic2', partitions = 1,
        value_format = 'json'
    );

    Click Run query.

Create a Stream Using Reduce

  1. Create a reduced stream using the REDUCE function.

    CREATE STREAM reduced 
        AS SELECT name,
        REDUCE(points,0,(s,x)=> (s+x)) AS total
    FROM stream2 EMIT CHANGES;

    Click Run query.

Insert Data and Query

  1. Insert some data into stream2:

    INSERT INTO stream2 VALUES('Misty', [Array[7, 5, 8, 8, 6]
    INSERT INTO stream2 VALUES('Marty', [Array[3, 5, 4, 6, 4]
    INSERT INTO stream2 VALUES('Mary', [Array[9, 7, 8, 7, 8]
    INSERT INTO stream2 VALUES('Mickey', [Array[8, 6, 8, 7, 5]

    Click Run query.

  2. Run a query on the reduced stream:

    SELECT * FROM reduced EMIT CHANGES;

    Click Run query, and then scroll down to see the results.

Filter Function

Create a Stream with an ARRAY

  1. Create a stream3 with a numbers ARRAY field:

    CREATE STREAM stream3 (
        id VARCHAR,
        numbers ARRAY<INTEGER>
    ) WITH (
        kafka_topic = 'topic3', partitions = 1,
        value_format = 'json'
    );

    Click Run query.

Create a Stream with Filter

  1. Create a filtered stream using the FILTER function:

    CREATE STREAM filtered 
        AS SELECT id,
        FILTER(numbers,x => (x%2 = 0)) AS even_numbers
    FROM stream3 EMIT CHANGES;

    Click Run query.

Insert Data and Query

  1. Insert some data into stream3:

    INSERT INTO stream3 VALUES('Group1', ARRAY[1, 2, 3,4,5,6,7,8,9,10,11,12,13,14,15]);

    Click Run query.

  2. Create a push query to run on the filtered stream:

    SELECT * FROM filtered EMIT CHANGES;

    Click Run query and scroll down to see the results.

Use the promo code KSQLDB101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.