Integration Architect (Presenter)
Principal Developer Advocate (Author)
In this exercise, you will try out the different user interfaces to ksqlDB. The first step is to provision a ksqlDB application on Confluent Cloud.
If you don’t already have a Confluent Cloud account, you can create one by signing up using the promo code 101KSQLDB
for $25 of free Confluent Cloud usage (details) (this will cover the time to actively finish the exercises in the course using a Basic Cluster, but make sure to delete your cluster if you are finished with it or aren't using it for a day or so).
Sign in to Confluent Cloud. We’ll be using Confluent Schema Registry to format our data as Avro—you’ll hear more on this in a later module. For now, we need to enable Schema Registry. From the lower left hand corner, select Schema Registry. On the Schema Registry page, select Set up on my own. Select your preferred cloud region and create the instance.
From your cluster’s home page, go to ksqlDB and add a new application. If prompted, select Create application myself.
For production use you would always want to use granular access for your ksqlDB applications, but in a development or experimental environment, global access is adequate and easier to set up. So, select Global Access, then click Continue.
Configure your application’s name as ksqldb101, leave the application size as 4, and select Launch application!
Wait 5 minutes for provisioning to complete.
Once the application has finished provisioning, you’re ready to start building some queries!
Note that you should make sure to delete your ksqlDB application if you are finished with it or won't be using it for a period of time. To delete, click ksqlDB on the left-hand side menu, then select Delete under Actions. Enter your application's name, then click Continue.
In the ksqlDB editor on Confluent Cloud, run the following statement to create a stream called MOVEMENTS
:
CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');
Shortly after you run this statement, a new stream will show up on the right-hand sidebar.
Add data to your stream by running these INSERT
statements:
INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');
To look at the data in the stream that we’ve populated, click the Flow tab, and then click the Movements stream. The stream will open to the right, and you’ll see the messages in it.
As well as the web interface seen above, you can use the ksqlDB CLI for interacting with ksqlDB servers (local or on Confluent Cloud). Here we’ll use Docker to simplify access to the CLI, but you can install the binary directly if you prefer. Before getting started with the CLI, you will need to get some details for the ksqlDB application so that we can connect to it.
To obtain the application information, you need the confluent command line tool. Once you’ve installed it, run:
confluent login
If you have more than one environment, use confluent environment list
to list the environments and switch to the relevant environment first. Then run the following command to list out the ksqlDB application(s) that you can access:
confluent ksql cluster list
You’ll see results similar to the following example. Make a note of both the Endpoint
and the Id
.
Id | Name | Topic Prefix | Kafka | Storage | Endpoint | Status
+--------------+---------------+--------------+-----------+---------+-------------------------------------------------------+--------+
lksqlc-92d50 | ksqldb101 | pksqlc-oqy1o | lkc-rxrgk | 500 | https://pksqlc-oqy1o.us-west4.gcp.confluent.cloud:443 | UP
Using the Id
value from above, run:
confluent api-key create --resource <<Id>>
Make a note of the resulting API Key and Secret.
Run the ksqlDB CLI from your terminal via Docker Compose. You’ll need the API details and endpoint that you obtained in the previous section.
Copy this text and paste it into a docker-compose.yml file:
---
services:
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.17.0
container_name: ksqldb-cli
entrypoint: /bin/sh
tty: true
Now run the following command to load the CLI:
docker compose up -d
To connect the CLI to the ksqlDB application on Confluent Cloud issue this command:
docker exec -it ksqldb-cli ksql -u <<KSQLDB_API_KEY>> -p <<KSQLDB_API_SECRET>> <<KSQLDB_ENDPOINT>>
You should see output similar to the following, with the endpoint details shown matching those you supplied:
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.
CLI v0.18.0, Server v0.18.0-rc14 located at https://pksqlc-oqy1o.us-west4.gcp.confluent.cloud:443
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
In the ksqlDB CLI, list the streams present in the ksqlDB application:
show streams;
You should see the MOVEMENTS
stream, since you’re connected to the same ksqlDB application as you were interacting with via the web UI above.
Insert some more data into the stream:
INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
INSERT INTO MOVEMENTS VALUES('Robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
Let’s now query this data, along with the data that we inserted earlier. Because we want to see all of the data that’s already in the stream, and not just new messages as they arrive, we need to run:
SET 'auto.offset.reset' = 'earliest';
Now run the following SELECT
to show all of the events in MOVEMENTS
:
SELECT * FROM MOVEMENTS EMIT CHANGES;
This will continuously provide results until you kill the query by pressing Ctrl-C.
Next, create a table:
CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT PERSON,
LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION,
COUNT(*) AS LOCATION_CHANGES,
COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
FROM MOVEMENTS
GROUP BY PERSON
EMIT CHANGES;
Then run:
show tables;
This will verify that the table has been created.
Now switch to Postman to try out the ksqlDB REST API, a great way to interact with your ksqlDB applications, whether they are running in Confluent Cloud or are self managed.
Get your endpoint from Settings in Confluent Cloud, then set the verb to POST, paste in your endpoint and add /ksql
to the end of the URL.
Under Auth, select Basic Auth and enter your ksqlDB credentials for Confluent Cloud.
Under the Body (raw) tab, enter a SHOW STREAMS
statement:
{
"ksql": "show streams;",
"streamProperties":{}
}
Send the request and then inspect the MOVEMENTS stream in the results.
Now SHOW
your tables, with a slight change to the payload in the Body tab:
{
"ksql": "show tables;",
"streamProperties":{}
}
Switch tabs to insert some more data into MOVEMENTS
:
{
"ksql": "INSERT INTO MOVEMENTS VALUES('Allison', 'Loveland');",
"streamProperties":{}
}
Now run a SELECT
statement to see the results of your insert:
{
"ksql": "SELECT * FROM PERSON_STATS WHERE person ='Allison';",
"streamProperties":{}
}
Note that the endpoint for a SELECT
statement is query
, rather than ksql
.
Since PERSON_STATS
is based on MOVEMENTS
, you should see the recently-inserted data in the results.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.