Get Started Free
course: ksqlDB 101

Hands On: Apply Lambda Functions to Arrays and Maps

2 min
Allison

Allison Walther

Integration Architect (Presenter)

Hands On: Apply Lambda Functions to Arrays and Maps

In this exercise, learn how to add lambda functions to queries.

Transform Function

Create a Stream with a MAP

  1. Create a stream1 with exam scores as a MAP field:

    CREATE STREAM stream1 (
        id INT, name VARCHAR,
        exam_scores MAP<STRING, DOUBLE>
    ) WITH (
        kafka_topic = 'topic1', partitions = 1,
        value_format = 'json'
    ); 

    Click Run query.

Create a Stream Using Transform

  1. Create a stream transformed using the TRANSFORM function:

    CREATE STREAM transformed 
        AS SELECT id, name,
        TRANSFORM(exam_scores,(k, v) => UCASE(k), (k, v) => (ROUND(v))) AS rounded_scores
    FROM stream1 EMIT CHANGES;

    Click Run query.

Insert Data and Query

  1. Insert some data into stream1:

    INSERT into stream1 values(1, 'Lisa', MAP('Nov':=93.53, 'Feb':=94.13, 'May':=96.83));
    INSERT into stream1 values(1, 'Larry', MAP('Nov':=83.53, 'Feb':=84.82, 'May':=85.27));
    INSERT into stream1 values(1, 'Melissa', MAP('Nov':=97.20, 'Feb':=96.47, 'May':=98.62));
    INSERT into stream1 values(1, 'Chris', MAP('Nov':=92.78, 'Feb':=91.15, 'May':=93.91));

    Click Run query.

  2. Run a query on the transformed stream:

    SELECT * FROM transformed EMIT CHANGES;

    Click Run query, and then scroll down to see the results.

Reduce Function

Create a Stream with an ARRAY

  1. Create stream2 with a points ARRAY field:

    CREATE STREAM stream2 (
        name VARCHAR,
        points ARRAY<INTEGER>
    ) WITH (
        kafka_topic = 'topic2', partitions = 1,
        value_format = 'json'
    );

    Click Run query.

Create a Stream Using Reduce

  1. Create a reduced stream using the REDUCE function.

    CREATE STREAM reduced 
        AS SELECT name,
        REDUCE(points,0,(s,x)=> (s+x)) AS total
    FROM stream2 EMIT CHANGES;

    Click Run query.

Insert Data and Query

  1. Insert some data into stream2:

    INSERT INTO stream2 VALUES('Misty', Array[7, 5, 8, 8, 6]);
    INSERT INTO stream2 VALUES('Marty', Array[3, 5, 4, 6, 4]);
    INSERT INTO stream2 VALUES('Mary', Array[9, 7, 8, 7, 8]);
    INSERT INTO stream2 VALUES('Mickey', Array[8, 6, 8, 7, 5]);

    Click Run query.

  2. Run a query on the reduced stream:

    SELECT * FROM reduced EMIT CHANGES;

    Click Run query, and then scroll down to see the results.

Filter Function

Create a Stream with an ARRAY

  1. Create a stream3 with a numbers ARRAY field:

    CREATE STREAM stream3 (
        id VARCHAR,
        numbers ARRAY<INTEGER>
    ) WITH (
        kafka_topic = 'topic3', partitions = 1,
        value_format = 'json'
    );

    Click Run query.

Create a Stream with Filter

  1. Create a filtered stream using the FILTER function:

    CREATE STREAM filtered 
        AS SELECT id,
        FILTER(numbers,x => (x%2 = 0)) AS even_numbers
    FROM stream3 EMIT CHANGES;

    Click Run query.

Insert Data and Query

  1. Insert some data into stream3:

    INSERT INTO stream3 VALUES('Group1', ARRAY[1, 2, 3,4,5,6,7,8,9,10,11,12,13,14,15]);

    Click Run query.

  2. Create a push query to run on the filtered stream:

    SELECT * FROM filtered EMIT CHANGES;

    Click Run query and scroll down to see the results.

  3. As this is the last exercise in the course, make sure to delete your ksqlDB application. To delete, click ksqlDB on the left-hand side menu, then select Delete under Actions. Enter your application's name, then click Continue.

Use the promo code KSQLDB101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.