Get Started Free
course: ksqlDB 101

Hands On: Interacting with ksqlDB

5 min
Allison

Allison Walther

Integration Architect (Presenter)

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Hands On: Interacting with ksqlDB

In this exercise, you will try out the different user interfaces to ksqlDB. The first step is to provision a ksqlDB application on Confluent Cloud.

If you don’t already have a Confluent Cloud account, you can create one by signing up using the promo code 101KSQLDB for $25 of free Confluent Cloud usage (details) (this will cover the time to actively finish the exercises in the course using a Basic Cluster, but make sure to delete your cluster if you are finished with it or aren't using it for a day or so).

Provisioning ksqlDB on Confluent Cloud

  1. Sign in to Confluent Cloud. We’ll be using Confluent Schema Registry to format our data as Avro—you’ll hear more on this in a later module. For now, we need to enable Schema Registry. From the lower left hand corner, select Schema Registry. On the Schema Registry page, select Set up on my own. Select your preferred cloud region and create the instance.

    schema-registry-setup

  2. From your cluster’s home page, go to ksqlDB and add a new application. If prompted, select Create application myself.

    provisioning-ksqlDB

  3. For production use you would always want to use granular access for your ksqlDB applications, but in a development or experimental environment, global access is adequate and easier to set up. So, select Global Access, then click Continue.

    global-access

  4. Configure your application’s name as ksqldb101, leave the application size as 4, and select Launch application!

    Launch-application

    Wait 5 minutes for provisioning to complete.

  5. Once the application has finished provisioning, you’re ready to start building some queries!

  6. Note that you should make sure to delete your ksqlDB application if you are finished with it or won't be using it for a period of time. To delete, click ksqlDB on the left-hand side menu, then select Delete under Actions. Enter your application's name, then click Continue.

Using ksqlDB on Confluent Cloud

  1. In the ksqlDB editor on Confluent Cloud, run the following statement to create a stream called MOVEMENTS:

    CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR)
      WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');

    CREATE-STREAM

    Shortly after you run this statement, a new stream will show up on the right-hand sidebar.

    pasted-left-image 0

  2. Add data to your stream by running these INSERT statements:

    INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
    INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds');
    INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
    INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');
  3. To look at the data in the stream that we’ve populated, click the Flow tab, and then click the Movements stream. The stream will open to the right, and you’ll see the messages in it.

    movements-stream

Using the ksqlDB Command Line Interface (CLI)

As well as the web interface seen above, you can use the ksqlDB CLI for interacting with ksqlDB servers (local or on Confluent Cloud). Here we’ll use Docker to simplify access to the CLI, but you can install the binary directly if you prefer. Before getting started with the CLI, you will need to get some details for the ksqlDB application so that we can connect to it.

Obtaining ksqlDB API and Endpoint Details from Confluent Cloud

To obtain the application information, you need the confluent command line tool. Once you’ve installed it, run:

confluent login

If you have more than one environment, use confluent environment list to list the environments and switch to the relevant environment first. Then run the following command to list out the ksqlDB application(s) that you can access:

confluent ksql cluster list

You’ll see results similar to the following example. Make a note of both the Endpoint and the Id.

       Id      |     Name      | Topic Prefix |   Kafka   | Storage |                       Endpoint                        | Status
+--------------+---------------+--------------+-----------+---------+-------------------------------------------------------+--------+
  lksqlc-92d50 | ksqldb101     | pksqlc-oqy1o | lkc-rxrgk |     500 | https://pksqlc-oqy1o.us-west4.gcp.confluent.cloud:443 | UP

Using the Id value from above, run:

confluent api-key create --resource <<Id>>

Make a note of the resulting API Key and Secret.

Connecting to Confluent Cloud ksqlDB from Local CLI

  1. Run the ksqlDB CLI from your terminal via Docker Compose. You’ll need the API details and endpoint that you obtained in the previous section.

    Copy this text and paste it into a docker-compose.yml file:

    ---
    services:
      ksqldb-cli:
        image: confluentinc/ksqldb-cli:0.17.0
        container_name: ksqldb-cli
        entrypoint: /bin/sh
        tty: true    

    Now run the following command to load the CLI:

    docker compose up -d 

    To connect the CLI to the ksqlDB application on Confluent Cloud issue this command:

    docker exec -it ksqldb-cli ksql -u <<KSQLDB_API_KEY>> -p <<KSQLDB_API_SECRET>> <<KSQLDB_ENDPOINT>>

    You should see output similar to the following, with the endpoint details shown matching those you supplied:

                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2021 Confluent Inc.
    
    CLI v0.18.0, Server v0.18.0-rc14 located at https://pksqlc-oqy1o.us-west4.gcp.confluent.cloud:443
    Server Status: RUNNING
    
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
  2. In the ksqlDB CLI, list the streams present in the ksqlDB application:

    show streams;

    You should see the MOVEMENTS stream, since you’re connected to the same ksqlDB application as you were interacting with via the web UI above.

Inserting and Querying Data from the CLI

  1. Insert some more data into the stream:

    INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
    INSERT INTO MOVEMENTS VALUES('Robin', 'Leeds');
    INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
  2. Let’s now query this data, along with the data that we inserted earlier. Because we want to see all of the data that’s already in the stream, and not just new messages as they arrive, we need to run:

    SET 'auto.offset.reset' = 'earliest';

    Now run the following SELECT to show all of the events in MOVEMENTS:

    SELECT * FROM MOVEMENTS EMIT CHANGES;

    This will continuously provide results until you kill the query by pressing Ctrl-C.

Create a Table

  1. Next, create a table:

    CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
      SELECT PERSON,
        LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION,
        COUNT(*) AS LOCATION_CHANGES,
        COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
      FROM MOVEMENTS
    GROUP BY PERSON
    EMIT CHANGES;

    Then run:

    show tables; 

    This will verify that the table has been created.

Use the ksqlDB REST API with Postman

Now switch to Postman to try out the ksqlDB REST API, a great way to interact with your ksqlDB applications, whether they are running in Confluent Cloud or are self managed.

Get your endpoint from Settings in Confluent Cloud, then set the verb to POST, paste in your endpoint and add /ksql to the end of the URL.

Set Up Auth

  1. Under Auth, select Basic Auth and enter your ksqlDB credentials for Confluent Cloud.

    basic-auth

  2. Under the Body (raw) tab, enter a SHOW STREAMS statement:

    {
      "ksql": "show streams;",
      "streamProperties":{}
    }

    SHOW-STREAMS

    Send the request and then inspect the MOVEMENTS stream in the results.

    inspect-the-MOVEMENTS

  3. Now SHOW your tables, with a slight change to the payload in the Body tab:

    {
      "ksql": "show tables;",
      "streamProperties":{}
    }

    your-tables

Insert Data

  1. Switch tabs to insert some more data into MOVEMENTS:

    {
      "ksql": "INSERT INTO MOVEMENTS VALUES('Allison', 'Loveland');",
      "streamProperties":{}
    }

    INSERT-INTO-MOVEMENTS

Display the Inserted Data

  1. Now run a SELECT statement to see the results of your insert:

    {
      "ksql": "SELECT * FROM PERSON_STATS WHERE person ='Allison';",
      "streamProperties":{}    
    }

    statement-to-see

    Note that the endpoint for a SELECT statement is query, rather than ksql. Since PERSON_STATS is based on MOVEMENTS, you should see the recently-inserted data in the results.

    endpoint

Use the promo code KSQLDB101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.