Enhance your career, get your certificate as a Data Streaming Engineer | Get your Certificate
In this tutorial, we take a look at Flink's most flexible form of user-defined function: Process Table Functions (PTFs). Process table functions support flexible N-to-M semantics, meaning that any number of input rows can correspond to any number of output rows, and they also give developers the ability to schedule actions and access state across multiple events.
The particular function that we will write and deploy in this tutorial is one that is well-known in statistics: one that calculates the median value over a user-specified number of events per partition key (in our case, the trailing median temperature per sensor). We will first call the function with Flink SQL, and then with the Table API.
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsIf you already have the Confluent Cloud resources required to run Flink SQL statements and Table API programs, you may skip to the next step after creating or copying the properties file as documented here to flink-process-table-function/table-api-cc/src/main/resources/cloud.properties within the top-level tutorials directory.
If you need to create the Confluent Cloud infrastructure needed to run this tutorial, the confluent-quickstart CLI plugin creates the resources that you need to get started with Confluent Cloud for Apache Flink. Install it by running:
confluent plugin install confluent-quickstartRun the plugin as follows to create the Confluent Cloud resources needed for this tutorial and generate a Table API client configuration file. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent flink region list --cloud <CLOUD>.
confluent quickstart \
--region us-east-1 \
--cloud aws \
--environment-name flink_table_api_tutorials_environment \
--kafka-cluster-name flink_table_api_tutorials_cluster \
--compute-pool-name flink_table_api_tutorials_pool \
--max-cfu 10 \
--create-flink-key \
--flink-properties-file ./flink-process-table-function/table-api-cc/src/main/resources/cloud.propertiesThe plugin should complete in under a minute and will generate a properties file as documented here.
The Median class (located under flink-process-table-function/median-ptf) demonstrates a custom Process Table Function (PTF), Flink's most flexible user-defined function type that supports stateful transformations over table partitions. Because the PTF implementation relies on Java reflection, PTF developers must guide the Flink runtime by providing hint annotations:
Due to the reflection-based implementation of PTFs in the Flink runtime, the PTF method to implement must be named eval. Developers must follow this naming convention; it's not enforced by an interface or abstract class method. The Median eval method maintains the list of trailing temperatures by adding the current row's temperature onto the end of the list and removing the oldest reading from the beginning if the list size surpasses the input numTrailing argument. Then it outputs the current temperature and trailing median by calling ProcessTableFunction.collect:
Double temperature = row.getFieldAs("temperature");
trailingTemps.temps.add(temperature);
while (trailingTemps.temps.size() > numTrailing) {
trailingTemps.temps.remove(0);
}
collect(Row.of(temperature, Quantiles.median().compute(trailingTemps.temps)));Now that we've examined the code, let's deploy the PTF to Confluent Cloud. First, build an uberjar containing all dependencies:
./gradlew flink-process-table-function:median-ptf:shadowJarUpload the JAR as a Flink artifact:
confluent flink artifact create median_ptf \
--artifact-file ./flink-process-table-function/median-ptf/build/libs/median-ptf-all.jar \
--cloud aws \
--region us-east-1Take note of the artifact ID returned (it will look like cfa-123456). Next, open the Flink SQL shell:
confluent flink shell --cloud aws --region us-east-1Set the active catalog and database to match your environment and cluster:
USE CATALOG flink_table_api_tutorials_environment;
USE flink_table_api_tutorials_cluster;Finally, register the PTF as a function, replacing cfa-123456 with your actual artifact ID:
CREATE FUNCTION Median
AS 'io.confluent.developer.Median'
USING JAR 'confluent-artifact://cfa-123456';With the PTF registered, let's try it out. First, create a table to hold temperature sensor readings:
CREATE TABLE temperature_readings (
sensor_id INT,
temperature DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts
);Insert some sample temperature data:
INSERT INTO temperature_readings VALUES
(0, 55, TO_TIMESTAMP('2026-05-01 02:15:30')),
(0, 50, TO_TIMESTAMP('2026-05-01 02:20:30')),
(0, 45, TO_TIMESTAMP('2026-05-01 02:25:30')),
(0, 40, TO_TIMESTAMP('2026-05-01 02:30:30')),
(0, 45, TO_TIMESTAMP('2026-05-01 02:35:30')),
(0, 50, TO_TIMESTAMP('2026-05-01 02:40:30')),
(0, 55, TO_TIMESTAMP('2026-05-01 02:45:30')),
(0, 60, TO_TIMESTAMP('2026-05-01 02:50:30')),
(0, 60, TO_TIMESTAMP('2026-05-01 02:53:30'));Now call the Median PTF, computing the median over the last 3 temperature readings per sensor:
SELECT *
FROM Median(TABLE temperature_readings PARTITION BY sensor_id, 3);You should see output showing each temperature along with its trailing 3-event median:
sensor_id temperature median
0 55.0 55.0
0 50.0 52.5
0 45.0 50.0
0 40.0 45.0
0 45.0 45.0
0 50.0 45.0
0 55.0 50.0
0 60.0 55.0
0 55.0 55.0
0 60.0 60.0You can also call PTFs programmatically using Flink's Table API. The code in TableApiPtfConfluentCloud.java demonstrates this by creating an in-memory table of temperature readings and calling the Median function via Table.process. Because our PTF requires set semantics, we must also first partition by the sensor_id field:
TableResult tableResult = tableEnv.from("temperature_readings")
.partitionBy($("sensor_id"))
.process(Median.class,
lit(3).asArgument("numTrailing"))
.execute();Compile the Table API application:
./gradlew flink-process-table-function:table-api-cc:buildRun it to see the median calculations in action:
./gradlew flink-process-table-function:table-api-cc:runThe application will print the first five median calculations:
Current temp: 55.0, median over last 3: 55.0
Current temp: 50.0, median over last 3: 52.5
Current temp: 45.0, median over last 3: 50.0
Current temp: 40.0, median over last 3: 45.0
Current temp: 45.0, median over last 3: 45.0When you are done, be sure to clean up any Confluent Cloud resources created for this tutorial. Since you created all resources in a Confluent Cloud environment, you can simply delete the environment and most of the resources will be deleted (e.g., the Kafka cluster and Flink compute pool). Run the following command in your terminal to get the environment ID of the form env-123456 corresponding to the environment named flink_table_api_tutorials_environment:
confluent environment listDelete the environment:
confluent environment delete <ENVIRONMENT_ID>Next, delete the Flink API key. This API key isn't associated with the deleted environment, so it needs to be deleted separately. Find the key:
confluent api-key list --resource flink --current-userAnd then copy the 16-character alphanumeric key and delete it:
confluent api-key delete <KEY>Finally, for the sake of housekeeping, delete the Table API client configuration file:
rm flink-process-table-function/table-api-cc/src/main/resources/cloud.propertiesgit clone git@github.com:confluentinc/tutorials.git
cd tutorialsStart Kafka, Schema Registry, and Flink with the following command run from the top-level tutorials repository directory:
docker compose -f ./docker/docker-compose-flinksql.yml up -dThe Median class (located under flink-process-table-function/median-ptf) demonstrates a custom Process Table Function (PTF), Flink's most flexible user-defined function type that supports stateful transformations over table partitions. Because the PTF implementation relies on Java reflection, PTF developers must guide the Flink runtime by providing hint annotations:
Due to the reflection-based implementation of PTFs in the Flink runtime, the PTF method to implement must be named eval. Developers must follow this naming convention; it's not enforced by an interface or abstract class method. The Median eval method maintains the list of trailing temperatures by adding the current row's temperature onto the end of the list and removing the oldest reading from the beginning if the list size surpasses the input numTrailing argument. Then it outputs the current temperature and trailing median by calling ProcessTableFunction.collect:
Double temperature = row.getFieldAs("temperature");
trailingTemps.temps.add(temperature);
while (trailingTemps.temps.size() > numTrailing) {
trailingTemps.temps.remove(0);
}
collect(Row.of(temperature, Quantiles.median().compute(trailingTemps.temps)));Now that we've examined the code, let's deploy the PTF to your local Flink environment. First, compile the PTF into an uberjar:
./gradlew flink-process-table-function:median-ptf:shadowJarCopy the JAR into the Flink SQL client container:
docker cp flink-process-table-function/median-ptf/build/libs/median-ptf-all.jar flink-sql-client:/opt/flink/libOpen a Flink SQL shell:
docker exec -it flink-sql-client sql-client.shOnce in the SQL shell, load the JAR file:
ADD JAR '/opt/flink/lib/median-ptf-all.jar';Register the PTF as a function:
CREATE FUNCTION Median
AS 'io.confluent.developer.Median'
USING JAR '/opt/flink/lib/median-ptf-all.jar';With the PTF registered, let's try it out. First, create a Kafka-backed table to hold temperature sensor readings:
CREATE TABLE temperature_readings (
sensor_id INT,
temperature DOUBLE,
ts TIMESTAMP(3),
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
'topic' = 'temperature-readings',
'properties.bootstrap.servers' = 'broker:9092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'sensor_id',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://schema-registry:8081',
'value.fields-include' = 'EXCEPT_KEY'
);Insert some sample temperature readings:
INSERT INTO temperature_readings VALUES
(0, 55, TO_TIMESTAMP('2026-05-01 02:15:30')),
(0, 50, TO_TIMESTAMP('2026-05-01 02:20:30')),
(0, 45, TO_TIMESTAMP('2026-05-01 02:25:30')),
(0, 40, TO_TIMESTAMP('2026-05-01 02:30:30')),
(0, 45, TO_TIMESTAMP('2026-05-01 02:35:30')),
(0, 50, TO_TIMESTAMP('2026-05-01 02:40:30')),
(0, 55, TO_TIMESTAMP('2026-05-01 02:45:30')),
(0, 60, TO_TIMESTAMP('2026-05-01 02:50:30')),
(0, 60, TO_TIMESTAMP('2026-05-01 02:53:30'));Now call the Median PTF, computing the median over the last 3 temperature readings per sensor:
SELECT *
FROM Median(TABLE temperature_readings PARTITION BY sensor_id, 3);You should see output showing each temperature along with its trailing 3-event median:
sensor_id temperature median
0 55.0 55.0
0 50.0 52.5
0 45.0 50.0
0 40.0 45.0
0 45.0 45.0
0 50.0 45.0
0 55.0 50.0
0 60.0 55.0
0 55.0 55.0
0 60.0 60.0You can also call PTFs programmatically using Flink's Table API. The code in TableApiPtfLocal.java demonstrates this by creating an in-memory table of temperature readings and calling the Median function via Table.process. Because our PTF requires set semantics, we must also first partition by the sensor_id field:
TableResult tableResult = tableEnv.from("temperature_readings")
.partitionBy($("sensor_id"))
.process(Median.class,
lit(3).asArgument("numTrailing"))
.execute();Compile the Table API application:
./gradlew flink-process-table-function:table-api-oss:buildRun it to see the median calculations in action:
./gradlew flink-process-table-function:table-api-oss:runThe application will print the first five median calculations:
Current temp: 55.0, median over last 3: 55.0
Current temp: 50.0, median over last 3: 52.5
Current temp: 45.0, median over last 3: 50.0
Current temp: 40.0, median over last 3: 45.0
Current temp: 45.0, median over last 3: 45.0From your local machine, stop the Kafka, Schema Registry, and Flink containers:
docker compose -f ./docker/docker-compose-flinksql.yml down