A stream in ksqlDB is a supercharged Apache Kafka topic. Consider the following ksqlDB SQL:
CREATE STREAM readings ( sensor VARCHAR KEY, reading DOUBLE, location VARCHAR ) WITH ( kafka_topic='readings', value_format='json', partitions=3 );
This example includes a schema with three columns,
location. The types are explicitly stated: strings (
double. Notice that
sensor is marked with the
There’s also additional metadata: The command specifies the underlying Kafka topic where the data will be stored (
readings), along with the serialization format (JSON) and the number of partitions. But what happens internally when you issue this SQL at the ksqlDB command line?
The ksqlDB servers communicate directly with your Kafka brokers. If the underlying topic doesn’t exist, it gets created by Kafka. The additional metadata, like the types, gets stored in ksqlDB’s object catalog. So after executing the command, you now have a Kafka topic (
readings) with three empty partitions.
Next, put some data into your empty stream. You accomplish this using rows, the equivalent of messages in Kafka. You put rows into a stream using simple SQL statements, just like with a relational database. If you’ve worked with relational databases such as PostgreSQL or MySQL, this will look familiar to you:
INSERT INTO readings (sensor, reading, location) VALUES ('sensor-1', 45, 'wheel'); INSERT INTO readings (sensor, reading, location) VALUES ('sensor-2', 41, 'motor');
So you insert into the stream
readings, naming the specific columns for which you would like to supply data. Each
INSERT statement corresponds to the invocation of a Kafka client producer call, and the data is stored in the stream’s underlying topic.
Now consider the representation of a ksqlDB row inside of a Kafka topic. Specifically look at the first
INSERT SQL statement above:
This becomes the first row in the first partition of the
readings topic, with an
0. It has a
key and a
value, as every Kafka record does. The
sensor-1 (as denoted by the
KEY parameter for the
sensor field when the stream was created), and the value is a map with a
45 and a
wheel. Subsequent rows are written to the topic’s partition based on a hashed value of the key: Rows with the same key will be written to the same partition.
Now that you have data in your stream, you can perform real-time processing on it. The simplest action is a transformation, which is done here with a query to convert all values in the
location field to uppercase:
CREATE STREAM clean AS SELECT sensor, reading, UCASE(location) AS location FROM readings EMIT changes;
This query consumes from your
readings stream and processes the data, leaving
reading unchanged and converting
location to uppercase. The data is written into a new stream named
When you run the above statement from your client (whether it is the CLI, the Confluent web interface, or the REST API), it is sent to the ksqlDB server. From there it is compiled by ksqlDB into a Kafka Streams topology to execute the query. The query runs even after the client disconnects and will resume if the ksqlDB server is restarted. We call this a persistent query.
As the rows are processed, the offset of the ksqlDB consumer advances through each partition:
These offsets are tracked automatically for you by the Kafka consumer protocol.
ksqlDB can be used to apply filters to a stream of data. Here we take the output of the previous query and create a new version that will selectively filter rows whose
reading values are greater than 41:
CREATE STREAM high_readings AS SELECT sensor, reading, location FROM clean WHERE reading > 41 EMIT CHANGES;
You can combine statements to perform multiple actions at once. For example, you can combine the two persistent queries shown previously into one:
CREATE STREAM high_pri AS SELECT sensor, reading, UCASE(location) AS location FROM readings WHERE reading > 41 EMIT CHANGES;
As before, you don’t have to worry about how these queries actually operate over the data in Kafka, which is one of the great advantages of using ksqlDB.
Because ksqlDB statements are executed as Kafka Streams topologies, you get all of the usual Kafka consumer guarantees. Below, multiple persistent queries are reading from the same stream:
Because this is effectively two Kafka consumers, you can rely on them being able to process the same data independently. The data that they process exists once on the source Kafka topic, and Kafka tracks the offset that each query has independently consumed. This way, you can scale the consumption of data across many applications, without introducing dependencies between them.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.