This tutorial demonstrates how to detect anomalies in a Kafka topic using Confluent Cloud for Apache Flink®. You will set up the necessary resources in Confluent Cloud and run built-in ARIMA model-based anomaly detection queries against example order data.
Login to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that streamlines resource creation in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>. The plugin should complete in under a minute.
confluent quickstart \
--environment-name anomaly-detection-env \
--kafka-cluster-name anomaly-detection-cluster \
--compute-pool-name anomaly-detection-poolStart a Flink SQL shell:
confluent flink shell --compute-pool \
$(confluent flink compute-pool list -o json | jq -r ".[0].id")Next, you’ll run a series of anomaly detection queries against mock data streams in Confluent Cloud, specifically the orders stream.
Confluent Cloud for Apache Flink® provides a built-in anomaly detection function, ML_DETECT_ANOMALIES, which identifies outliers in a data stream based on an ARIMA model.
Anomaly detection operates over a defined window. For example, to find global outliers, specify an OVER window that includes previous rows:
OVER (ORDER BY $rowtime
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)The following query extracts the Boolean value that indicates whether a record is an anomaly, ignoring rows for which no prediction is made (because there isn’t enough data yet). Here, we set two model hyperparameters:
SELECT
customer_id,
ts,
price,
anomaly_results[6] AS is_anomaly
FROM (
SELECT
customer_id,
$rowtime as ts,
price,
ML_DETECT_ANOMALIES(price, $rowtime, JSON_OBJECT('horizon' VALUE 1, 'confidencePercentage' VALUE 90.0))
OVER (ORDER BY $rowtime
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly_results
FROM `examples`.`marketplace`.`orders`)
WHERE anomaly_results[6] IS NOT NULL;You’ll notice that some very large or small orders are flagged as anomalies, for example:
customer_id ts price is_anomaly
3012 2025-11-03 10:19:34.558 17.37 FALSE
3180 2025-11-03 10:19:34.598 10.43 TRUE
3218 2025-11-03 10:19:34.558 27.97 FALSE
3161 2025-11-03 10:19:34.559 63.72 FALSE
3171 2025-11-03 10:19:34.578 69.95 FALSE
3163 2025-11-03 10:19:34.598 79.18 FALSE
3063 2025-11-03 10:19:34.618 40.93 FALSE
3058 2025-11-03 10:19:34.638 99.69 TRUEBecause anomaly detection works as an OVER aggregation query, you can define exogenous variables that establish different cohorts with unique definitions of "outlier." For example, a large order from a platinum member who frequently places large orders may be typical, whereas a large order from a new anonymous customer may be anomalous.
The following query is similar to the previous one, but partitions the window by customer_id and lowers the minTrainingSize hyperparameter to 16 to get results sooner. This effectively defines anomalous behavior per customer:
SELECT
customer_id,
ts,
price,
anomaly_results[6] AS is_anomaly
FROM (
SELECT
customer_id,
$rowtime as ts,
price,
ML_DETECT_ANOMALIES(price, $rowtime, JSON_OBJECT('horizon' VALUE 1, 'minTrainingSize' VALUE 16, 'confidencePercentage' VALUE 90.0))
OVER (PARTITION BY customer_id
ORDER BY $rowtime
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS anomaly_results
FROM `examples`.`marketplace`.`orders`)
WHERE anomaly_results[6] IS NOT NULL;If you let the query run long enough, you’ll observe that some customers who usually place small orders will have large outlier orders. Others who typically place large orders will have small outlier orders. For example:
customer_id ts price is_anomaly
...
3020 2025-11-03 11:15:46.524 43.48 FALSE
3020 2025-11-03 11:16:10.421 30.02 FALSE
3020 2025-11-03 11:15:08.424 21.39 FALSE
3020 2025-11-03 11:16:10.918 97.86 TRUE
...
3183 2025-11-03 11:10:08.829 70.91 FALSE
3183 2025-11-03 11:12:11.514 10.53 TRUE
3183 2025-11-03 11:13:21.092 91.22 FALSE
3183 2025-11-03 11:16:09.783 87.10 FALSE
...When you are finished, delete the anomaly-detection-env environment by first getting the environment ID of the form env-123456 corresponding to it:
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>