Course: ksqlDB 101

Hands On: Interacting with ksqlDB

5 min
Allison WaltherIntegration Architect (Course Presenter)
Robin MoffattStaff Developer Advocate (Course Author)

Hands On: Interacting with ksqlDB

In this exercise, you will try out the different user interfaces to ksqlDB. The first step is to provision a ksqlDB application on Confluent Cloud.

If you don’t already have a Confluent Cloud account, you can create one here. You can use the promo code 101KSQLDB for $101 of free Confluent Cloud usage.

Provisioning ksqlDB on Confluent Cloud

  1. Sign in to Confluent Cloud. From your cluster’s home page, go to ksqlDB and add a new application. If prompted, select Create application myself.

    provisioning-ksqlDB

  2. For production use you would always want to use granular access for your ksqlDB applications, but in a development or experimental environment, global access is adequate and easier to set up. So, select Global Access, then click Continue.

    global-access

  3. Configure your application’s name as ksqldb101, leave the application size as 4, and select Launch application (provisioning the application can take 5 minutes or more).

    Launch-application

  4. Once the application has finished provisioning, you’re ready to start building some queries!

Using ksqlDB on Confluent Cloud

  1. In the ksqlDB editor on Confluent Cloud, run the following statement to create a stream called MOVEMENTS:

    CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR)
      WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');

    CREATE-STREAM

    Shortly after you run this statement, a new stream will show up on the right-hand sidebar.

    pasted-left-image 0

  2. Add data to your stream by running these INSERT statements:

    INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
    INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds');
    INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
    INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');
  3. To look at the data in the stream that we’ve populated, click the Flow tab, and then click the Movements stream. The stream will open to the right, and you’ll see the messages in it.

    movements-stream

Using the ksqlDB Command Line Interface (CLI)

As well as the web interface seen above, you can use the ksqlDB CLI for interacting with ksqlDB servers (local or on Confluent Cloud). Here we’ll use Docker to simplify access to the CLI, but you can install the binary directly if you prefer. Before getting started with the CLI, you will need to get some details for the ksqlDB application so that we can connect to it.

Obtaining ksqlDB API and Endpoint Details from Confluent Cloud

To obtain the application information, you need the ccloud command-line tool. Once you’ve installed it, run:

ccloud login

If you have more than one environment, use ccloud environment to list the environments and switch to the relevant environment first. Then run the following command to list out the ksqlDB application(s) that you can access:

ccloud ksql app list

You’ll see results similar to the following example. Make a note of both the Endpoint and the Id.

       Id      |     Name      | Topic Prefix |   Kafka   | Storage |                       Endpoint                        | Status
+--------------+---------------+--------------+-----------+---------+-------------------------------------------------------+--------+
  lksqlc-92d50 | ksqldb101     | pksqlc-oqy1o | lkc-rxrgk |     500 | https://pksqlc-oqy1o.us-west4.gcp.confluent.cloud:443 | UP

Using the Id value from above, run:

ccloud api-key create --resource <<Id>>

Make a note of the resulting API Key and Secret.

Connecting to Confluent Cloud ksqlDB from Local CLI

  1. Run the ksqlDB CLI from your terminal via Docker Compose. You’ll need the API details and endpoint that you obtained in the previous section.

    Copy this text and paste it into a docker-compose.yml file:

    ---
    services:
      ksqldb-cli:
        image: confluentinc/ksqldb-cli:0.17.0
        container_name: ksqldb-cli
        entrypoint: /bin/sh
        tty: true    

    Now run the following command to load the CLI:

    docker compose up -d 

    To connect the CLI to the ksqlDB application on Confluent Cloud issue this command:

    docker exec -it ksqldb-cli ksql -u <<KSQLDB_API_KEY>> -p <<KSQLDB_API_SECRET>> <<KSQLDB_ENDPOINT>>

    You should see output similar to the following, with the endpoint details shown matching those you supplied:

                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2021 Confluent Inc.
    
    CLI v0.18.0, Server v0.18.0-rc14 located at https://pksqlc-oqy1o.us-west4.gcp.confluent.cloud:443
    Server Status: RUNNING
    
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
  2. In the ksqlDB CLI, list the streams present in the ksqlDB application:

    show streams;

    You should see the MOVEMENTS stream, since you’re connected to the same ksqlDB application as you were interacting with via the web UI above.

Inserting and Querying Data from the CLI

  1. Insert some more data into the stream:

    INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
    INSERT INTO MOVEMENTS VALUES('Robin', 'Leeds');
    INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
  2. Let’s now query this data, along with the data that we inserted earlier. Because we want to see all of the data that’s already in the stream, and not just new messages as they arrive, we need to run:

    SET 'auto.offset.reset' = 'earliest';

    Now run the following SELECT to show all of the events in MOVEMENTS:

    SELECT * FROM MOVEMENTS EMIT CHANGES;

    This will continuously provide results until you kill the query by pressing Ctrl-C.

Create a Table

  1. Next, create a table:

    CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
      SELECT PERSON,
        LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION,
        COUNT(*) AS LOCATION_CHANGES,
        COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
      FROM MOVEMENTS
    GROUP BY PERSON
    EMIT CHANGES;

    Then run:

    show tables; 

    This will verify that the table has been created.

Use the ksqlDB REST API with Postman

Now switch to Postman to try out the ksqlDB REST API, a great way to interact with your ksqlDB applications, whether they are running in Confluent Cloud or are self managed.

Get your endpoint from Settings in Confluent Cloud, then set the verb to POST, paste in your endpoint and add /ksql to the end of the URL.

Set Up Auth

  1. Under Auth, select Basic Auth and enter your ksqlDB credentials for Confluent Cloud.

    basic-auth

  2. Under the Body (raw) tab, enter a SHOW STREAMS statement:

    {
      "ksql": "show streams;",
      "streamProperties":{}
    }

    SHOW-STREAMS

    Send the request and then inspect the MOVEMENTS stream in the results.

    inspect-the-MOVEMENTS

  3. Now SHOW your tables, with a slight change to the payload in the Body tab:

    {
      "ksql": "show tables;",
      "streamProperties":{}
    }

    your-tables

Insert Data

  1. Switch tabs to insert some more data into MOVEMENTS:

    {
      "ksql": "INSERT INTO MOVEMENTS VALUES('Allison', 'Loveland');",
      "streamProperties":{}
    }

    INSERT-INTO-MOVEMENTS

Display the Inserted Data

  1. Now run a SELECT statement to see the results of your insert:

    {
      "ksql": "SELECT * FROM PERSON_STATS WHERE person ='Allison';",
      "streamProperties":{}    
    }

    statement-to-see

    Note that the endpoint for a SELECT statement is query, rather than ksql. Since PERSON_STATS is based on MOVEMENTS, you should see the recently-inserted data in the results.

    endpoint

Use the promo code KSQLDB101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.