The best way to interact with ksqlDB when you’re learning how things work is with the ksqlDB CLI. Fire it up as follows:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Before we get too far, let’s set the auto.offset.reset
configuration parameter to earliest
. This means all new ksqlDB queries will automatically compute their results from the beginning of a stream, rather than the end. This isn’t always what you’ll want to do in production, but it makes query results much easier to see in examples like this.
SET 'auto.offset.reset' = 'earliest';
We will create a stream to model some sample data with this command.
CREATE STREAM TEMPERATURE_READINGS_RAW (eventTime BIGINT, temperature INT)
WITH (kafka_topic='deviceEvents', value_format='avro', partitions=1);
The schema for our example stream specifies eventTime
as a BIGINT
. It is common for applications to send data in Unix time, thus why eventTime
is specified as a BIGINT
.
Insert sample data into the stream with the commands below.
INSERT INTO TEMPERATURE_READINGS_RAW (eventTime, temperature) VALUES (1615566394751, 100);
INSERT INTO TEMPERATURE_READINGS_RAW (eventTime, temperature) VALUES (1615566401534, 132);
INSERT INTO TEMPERATURE_READINGS_RAW (eventTime, temperature) VALUES (1615567732840, 144);
INSERT INTO TEMPERATURE_READINGS_RAW (eventTime, temperature) VALUES (1615567735866, 103);
INSERT INTO TEMPERATURE_READINGS_RAW (eventTime, temperature) VALUES (1615567736875, 102);
INSERT INTO TEMPERATURE_READINGS_RAW (eventTime, temperature) VALUES (1615567738890, 101);
Let’s examine the ksqlDB stream with the DESCRIBE
function:
DESCRIBE TEMPERATURE_READINGS_RAW;
After entering the command above, you should see the following:
Name : TEMPERATURE_READINGS_RAW
Field | Type
-----------------------
EVENTTIME | BIGINT
TEMPERATURE | INTEGER
-----------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
To convert the BIGINT
UTC
time into a TIMESTAMP
American/Denver
time, we need to use a combination of two ksqlDB functions: FROM_UNIX
and CONVERT_TZ
. Our first step is to convert the BIGINT
into a TIMESTAMP
using the FROM_UNIX
ksqlDB function. Run the command below to convert the EVENTTIME
column to a TIMESTAMP
.
SELECT TEMPERATURE, FROM_UNIXTIME(EVENTTIME) AS EVENTTIME_TS
FROM TEMPERATURE_READINGS_RAW
EMIT CHANGES
LIMIT 6;
The query will result in the following output.
+-------------------------+-------------------------+
|TEMPERATURE |EVENTTIME_TS |
+-------------------------+-------------------------+
|100 |2021-03-12T16:26:34.751 |
|132 |2021-03-12T16:26:41.534 |
|144 |2021-03-12T16:48:52.840 |
|103 |2021-03-12T16:48:55.866 |
|102 |2021-03-12T16:48:56.875 |
|101 |2021-03-12T16:48:58.890 |
Limit Reached
Query terminated
Let’s create a persistent query to continuously convert the EVENTTIME
column into a TIMESTAMP
. Do so with the command below.
CREATE STREAM TEMPERATURE_READINGS_TIMESTAMP AS
SELECT TEMPERATURE, FROM_UNIXTIME(EVENTTIME) AS EVENTTIME_TS
FROM TEMPERATURE_READINGS_RAW;