Principal Product Manager (Presenter)
A stream in ksqlDB is a supercharged Apache Kafka topic. Consider the following ksqlDB SQL:
CREATE STREAM readings (
sensor VARCHAR KEY,
reading DOUBLE,
location VARCHAR
) WITH (
kafka_topic='readings',
value_format='json',
partitions=3
);
This example includes a schema with three columns, sensor, reading, and location. The types are explicitly stated: strings (varchar) and double. Notice that sensor is marked with the KEY keyword.
There’s also additional metadata: The command specifies the underlying Kafka topic where the data will be stored (readings), along with the serialization format (JSON) and the number of partitions. But what happens internally when you issue this SQL at the ksqlDB command line?
The ksqlDB servers communicate directly with your Kafka brokers. If the underlying topic doesn’t exist, it gets created by Kafka. The additional metadata, like the types, gets stored in ksqlDB’s object catalog. So after executing the command, you now have a Kafka topic (readings) with three empty partitions.
Next, put some data into your empty stream. You accomplish this using rows, the equivalent of messages in Kafka. You put rows into a stream using simple SQL statements, just like with a relational database. If you’ve worked with relational databases such as PostgreSQL or MySQL, this will look familiar to you:
INSERT INTO readings (sensor, reading, location)
VALUES ('sensor-1', 45, 'wheel');
INSERT INTO readings (sensor, reading, location)
VALUES ('sensor-2', 41, 'motor');
So you insert into the stream readings, naming the specific columns for which you would like to supply data. Each INSERT statement corresponds to the invocation of a Kafka client producer call, and the data is stored in the stream’s underlying topic.
Now consider the representation of a ksqlDB row inside of a Kafka topic. Specifically look at the first INSERT SQL statement above:
This becomes the first row in the first partition of the readings topic, with an offset of 0. It has a key and a value, as every Kafka record does. The key is sensor-1 (as denoted by the KEY parameter for the sensor field when the stream was created), and the value is a map with a reading of 45 and a location of wheel. Subsequent rows are written to the topic’s partition based on a hashed value of the key: Rows with the same key will be written to the same partition.
Now that you have data in your stream, you can perform real-time processing on it. The simplest action is a transformation, which is done here with a query to convert all values in the location field to uppercase:
CREATE STREAM clean AS
SELECT sensor,
reading,
UCASE(location) AS location
FROM readings
EMIT changes;
This query consumes from your readings stream and processes the data, leaving sensor and reading unchanged and converting location to uppercase. The data is written into a new stream named clean.
When you run the above statement from your client (whether it is the CLI, the Confluent web interface, or the REST API), it is sent to the ksqlDB server. From there it is compiled by ksqlDB into a Kafka Streams topology to execute the query. The query runs even after the client disconnects and will resume if the ksqlDB server is restarted. We call this a persistent query.
As the rows are processed, the offset of the ksqlDB consumer advances through each partition:
These offsets are tracked automatically for you by the Kafka consumer protocol.
ksqlDB can be used to apply filters to a stream of data. Here we take the output of the previous query and create a new version that will selectively filter rows whose reading values are greater than 41:
CREATE STREAM high_readings AS
SELECT sensor, reading, location
FROM clean
WHERE reading > 41
EMIT CHANGES;
You can combine statements to perform multiple actions at once. For example, you can combine the two persistent queries shown previously into one:
CREATE STREAM high_pri AS
SELECT sensor,
reading,
UCASE(location) AS location
FROM readings
WHERE reading > 41
EMIT CHANGES;
As before, you don’t have to worry about how these queries actually operate over the data in Kafka, which is one of the great advantages of using ksqlDB.
Because ksqlDB statements are executed as Kafka Streams topologies, you get all of the usual Kafka consumer guarantees. Below, multiple persistent queries are reading from the same stream:
Because this is effectively two Kafka consumers, you can rely on them being able to process the same data independently. The data that they process exists once on the source Kafka topic, and Kafka tracks the offset that each query has independently consumed. This way, you can scale the consumption of data across many applications, without introducing dependencies between them.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Michael Drogalis with Confluent. And in this module, we're gonna be looking at how stateless operations work in ksqlDB. Let's start by looking at the hello world of ksqlDB. Creating a stream. A stream is a supercharged Kafka topic. In this SQL statement, I create a stream called readings. It has a schema of three columns, sensor, reading and location, and their types are explicitly spelled out string, double and string. Notice that the sensor column is denoted with the key keyword. We'll look at what that means in just a second. There's also some additional metadata. We specify the underlying Kafka topic for where the data will be stored. We specify the serialization format, which in this case is JSON. And then we specify the number of partitions when you're at the CLI and you issue this command to ksqlDB servers. What happens? What happens is that ksqlDB servers turn around and talk to the Kafka brokers. If the underlying topic doesn't exist, it gets created. And the additional metadata like the types get stored in ksqlDB's object catalog. So in this case, I know I have a topic named readings that has three empty partitions denoted by each of these numbers, zero, one and two. Now that I have an empty stream. The next thing that makes sense is to put some data into it. I do that with rows in Kafka we have records, in ksqlDB we have rows, the way that you get rows into a stream are with simple SQL statements, just like a relational database. If you've worked with Postgres or MySQL, this will be very familiar to you. I insert into the stream readings, naming the specific columns that I wanna supply data for each insert statement corresponds to the invocation of a kafka client producer call. And the data is stored in the underlying topic for the stream. Now let's look at the representation of a row inside of a Kafka topic, since each row is just a Kafka record. What does that record look like? When I look at the first row, I see that the offset is zero. Meaning that it's at the beginning. I can also see that it has a key and a value as every Kafka record does. Here the key is sensor one and the value is a map with a reading of 45 and a location of wheel notice how I got to be this way. Remember that the sensor column was denoted with the key keyword. That is why it ends up in the key position of the underlying Kafka record. This helps you exercise control in SQL of the underlying Kafka data. As you can see, we have more rows. And as I add more rows, they get added to the left. Now that we have data in our stream, what comes next? We wanna do something with that data in real time. The simplest thing that we can do is a transformation, we'll do this with a persistent query, which is what we see at the bottom. What this query does is it consumes from the reading stream that we just created and it selects a few columns. It selects the sensor and the readings, just carrying them through and then takes the location and converts it to all uppercase, just a simple little functional example. It takes this data and reads it indefinitely and creates a new stream called clean. So we're cleaning up the data you can imagine. And when you launch this program from your CLI to ksqlDB servers, it runs it perpetually and indefinitely. It's often running until you tell it to stop. What it's doing is that ksqlDB constructs a Kafka streams topology to execute the query. You can imagine that we give this persistent query a name PQ1, just to reference it here in this depiction. And imagine that this is a single server running a single persistent query. What it's doing is that it's sitting there waiting for new rows to show up and processing them as they arrive. This row, for instance, is from partition to offset zero. And the location is muffler. You can see how, when it passes through the query, it's converted to all uppercase, as the rows are processed, you can see how ksqlDB tracks how advanced each partition is by the process offset. These offsets are committed automatically for you through the consumer protocol, let's do something slightly more advanced filtering rows out of a stream. This shows how the program model works. I take a stream and I derive a new stream from it. I can chain these things together. This is the last query we looked at PQ1. I'm going to create a new query PQ2, again just for the purposes of inspecting in this depiction, and this query will drop all readings that are greater than 41. When I play this out, the rows come in and you'll see a few of them get discarded. This works because two persistent queries are compiled into Kafka streams applications that are sharing a mutual Kafka topic. That is why the program model is so flexible. This is particularly nice because you can group statements into whatever way you see fit. Here I take the two statements that we had before, and I consolidate them into a single statement. You don't have to worry about where these things run. It's compiled down to the same semantic program. And that is the beauty of using high-level SQL because ksqlDB statements are executed as Kafka streams topologies, you get all the guarantees you're used to with a Kafka consumer. Here's a look at how this would work with multiple persistent queries reading from the same stream. Here I have another persistent query chain onto the high priority stream. What would you expect if it were two Kafka consumers, you'd expect them to work in a conflict free way. I'll play this through to show what happens. I have a row that comes through and both persistent queries read it. You can imagine they're on completely different servers. Two copies of the data are created. They're completely separate a copy here, and a copy here. These queries don't step on each other's feet. You get the scale out that you would come to expect with regular Kafka consumers, and everyone gets their own isolated copy of the data.