"This timestamp is a Unix epoch in seconds and that one is in milliseconds, and this other timestamp is an ISO 8601-formatted string. I need to convert them into a canonical format." Every developer has faced timestamp data type wrangling tasks like this regardless of the programming language they're using. Timestamp wrangling is a fact of the developer's life because timestamps come in different shapes and sizes. In this tutorial, we will demonstrate the various ways to deal with timestamps in Flink SQL, including what to do when defining watermarks for event time processing.
You can run through this tutorial locally with the Flink SQL Client against Flink and Kafka running in Docker, or with Confluent Cloud. Run through these steps to provision Kafka and Flink.
Clone the confluentinc/tutorials GitHub repository (if you haven't already) and navigate to the tutorials directory:
git clone git@github.com:confluentinc/tutorials.git
cd tutorials
Start Flink and Kafka:
docker compose -f ./docker/docker-compose-flinksql.yml up -d
Next, open the Flink SQL Client CLI:
docker exec -it flink-sql-client sql-client.sh
Finally, run following SQL statement to create the temperature_readings table backed by Kafka running in Docker. Observe that we begin our timestamp transformation journey with timestamps in a common format: the Unix epoch, i.e., the number of seconds that have elapsed since January 1, 1970, at midnight UTC. We define it as a BIGINT.
CREATE TABLE temperature_readings (
sensor_id INT,
temperature DOUBLE,
event_ts_epoch_seconds BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'temperature-readings',
'properties.bootstrap.servers' = 'broker:9092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'sensor_id',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://schema-registry:8081',
'value.fields-include' = 'EXCEPT_KEY'
Note: create all resources in a new Confluent Cloud environment so that clean up is easier
In the Confluent Cloud Console, navigate to your environment and then click the Open SQL Workspace button for the compute pool that you have created. Create a new
Select the default catalog (Confluent Cloud environment) and database (Kafka cluster) to use with the dropdowns at the top right.
Finally, run following SQL statement to create the temperature_readings table. Observe that we begin our timestamp transformation journey with timestamps in a common format: the Unix epoch, i.e., the number of seconds that have elapsed since January 1, 1970, at midnight UTC. We define it as a BIGINT.
CREATE TABLE temperature_readings (
sensor_id INT,
temperature DOUBLE,
event_ts_epoch_seconds BIGINT
Given the temperature_readings table defined in the previous section, let's first insert a record that we will use to demonstrate the various timestamp wrangling tasks that follow. The timestamp of the record is February 13, 2009, at 11:31:30 PM UTC:
INSERT INTO temperature_readings VALUES
(0, 100.32, UNIX_TIMESTAMP('2009-02-13 23:31:30.000 +0000', 'yyyy-MM-dd HH:mm:ss.SSS Z'));
Select the timestamp from this record:
SELECT event_ts_epoch_seconds FROM temperature_readings;
You will see that our timestamp is a particularly visually appealing one celebrated by computing trivia enthusiasts worldwide: 1234567890. (We could have inserted this numeric directly but would have lost the exciting reveal.)
If you have an epoch timestamp that you'd like to use in defining watermarks, you won't be able to use it directly. Try to use it:
Important: Confluent Cloud provides a watermark by default, so use MODIFY WATERMARK rather than ADD WATERMARK if you are running this tutorial in Confluent Cloud.
ALTER TABLE temperature_readings
ADD WATERMARK FOR event_ts_epoch_seconds AS event_ts_epoch_seconds;
You will hit an error:
Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is BIGINT
To get around this, you can add a computed column of the proper data type using the TO_TIMESTAMP_LTZ function. The following DDL statement will add a new computed timestamp column of type TIMESTAMP_LTZ(0):
ALTER TABLE temperature_readings
ADD event_ts_ltz AS TO_TIMESTAMP_LTZ(event_ts_epoch_seconds, 0);
Important: the precision of 0 is important and must agree with the precision of the epoch timestamp. If we had instead specified a precision of 3 with TO_TIMESTAMP_LTZ(event_ts_epoch_seconds, 3), we would get an incorrect timestamp in 1970.
Query the table:
SELECT * FROM temperature_readings;
And you will see the new computed column:
sensor_id temperature event_ts_epoch_seconds event_ts_ltz
0 100.32 1234567890 2009-02-13 23:31:30.000
The new column can now be used to define a watermark strategy for event time processing (remember to MODIFY rather than ADD if running in Confluent Cloud):
ALTER TABLE temperature_readings
ADD WATERMARK FOR event_ts_ltz AS event_ts_ltz;
The previous section showed how to create a TIMESTAMP_LTZ from a Unix epoch. In order to create a TIMESTAMP without local time zone, use the TO_TIMESTAMP function. Note that this function takes a string, so we first call FROM_UNIXTIME to convert the epoch to a string.
SELECT TO_TIMESTAMP(FROM_UNIXTIME(event_ts_epoch_seconds))
FROM temperature_readings;
The previous section introduced FROM_UNIXTIME to convert an epoch timestamp to a string. Note that it takes an optional second argument that specifies the format (a Java SimpleDateFormat string):
FROM_UNIXTIME(event_ts_epoch_seconds, 'yyyy-MM-dd hh:mm'),
FROM_UNIXTIME(event_ts_epoch_seconds, 'yyyy-MM-dd HH:mm:ss.SSS Z')
FROM temperature_readings;
To convert a string to the corresponding epoch, use the UNIX_TIMESTAMP function. Just as FROM_UNIXTIME takes an optional date format string to override the default of yyyy-mm-dd hh:mm:ss, UNIX_TIMESTAMP takes an optional date format string to specify the format. This is necessary to avoid any ambiguity. (Is 2024-01-07 January 7th or July 1st?)
For example:
UNIX_TIMESTAMP('2009-02-13 23:31:30.000 +0000', 'yyyy-MM-dd HH:mm:ss.SSS Z'),
UNIX_TIMESTAMP('2009-02-13 11:31:30 PM UTC', 'yyyy-MM-dd hh:mm:ss a z'),
UNIX_TIMESTAMP('Feb 13, 2009 23:31:30 PM UTC', 'MMM d, yyyy HH:mm:ss a z');
These all return the same epoch timestamp 1234567890.
Flink SQL has a few functions that can extract parts from a timestamp or date. Some of the functions like HOUR, MINUTE, and SECOND take a timestamp, while others like DAYOFWEEK (which returns a number 1-7 where 1 is Sunday) take a DATE and thus would need to be cast from a timestamp to a date first. We have seen in this tutorial how TO_TIMESTAMP_LTZ and TO_TIMESTAMP can be used to convert an epoch timestamp to a TIMESTAMP_LTZ or TIMESTAMP. TO_DATE can similarly be used to convert to a date, though it operates on a string so TO_DATE(FROM_UNIXTIME(epoch_timestamp_col)) will do the conversion. In the following example, we do just that in order to extract the day of the week that our timestamp fell on. Note that we have an outer query that converts the numeric day of week to a human-readable form.
WHEN dow = 1 then 'Sun'
WHEN dow = 2 then 'Mon'
WHEN dow = 3 then 'Tue'
WHEN dow = 4 then 'Wed'
WHEN dow = 5 then 'Thu'
WHEN dow = 6 then 'Fri'
WHEN dow = 7 then 'Sat'
END AS day_of_week
DAYOFWEEK(TO_DATE(FROM_UNIXTIME(event_ts_epoch_seconds))) as dow
FROM temperature_readings
This shows us that the epoch timestamp 1234567890 fell on a Friday. Beware: the time zone matters! In Shanghai, which is 8 hours ahead of UTC, this timestamp fell on a Saturday, which we can see if we set the local time zone and rerun the query:
SET 'table.local-time-zone' = 'Asia/Shanghai';
Converting a timestamp from one time zone to another is straightforward with the CONVERT_TZ function. This function takes a string datetime (default format yyyy-MM-dd hh:mm:ss) as well as the source and destination timezone. Building on the time zone caution from the previous section, we can see that Friday February 13th at 11:31:30 UTC is indeed 8 hours ahead in Shanghai:
SELECT CONVERT_TZ('2009-02-13 23:31:30', 'UTC', 'Asia/Shanghai');
This query outputs 2009-02-14 07:31:30.
When you are finished, clean up the infrastructure used for this tutorial, either by deleting the environment that you created in Confluent Cloud, or, if running in Docker:
docker compose -f ./docker/docker-compose-flinksql.yml down