Flag unhealthy IoT devices

Edit this page
Organizations are turning towards the Internet of Things (IoT) to provide immediately actionable insights into the health and performance of various devices. However, each device can emit high volumes of telemetry data, making it difficult to accurately analyze and determine if and when something needs attention in real time. This tutorial shows you how to process and coalesce that telemetry data using ksqlDB and flag devices that warrant more investigation.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

In this example, the telemetry events for device threshold values and reporting alarms are stored in Postgres database tables. The connector reads from the tables and writes the data into Kafka topics in Confluent Cloud.

The following stream processing app identifies which devices need to be investigated where the threshold is insufficient and alarm code is not zero.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_telemetry WITH (
  'connector.class'          = 'PostgresSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'alarms, throughputs',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create table with latest state of alarms
CREATE TABLE alarms (
  device_id VARCHAR PRIMARY KEY,
  alarm_name VARCHAR,
  code INT
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'alarms',
  PARTITIONS = 6
);

-- Create stream of throughputs
CREATE STREAM throughputs (
  device_id VARCHAR KEY,
  throughput DOUBLE
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'throughputs',
  PARTITIONS = 6
);

-- Create new stream of critical issues to investigate
-- where throughputs are below threshold 1000.0 and alarm code is not 0
CREATE STREAM critical_issues WITH (KAFKA_TOPIC = 'critical_issues') AS
  SELECT
    t.device_id,
    t.throughput,
    a.alarm_name,
    a.code
  FROM throughputs t
  LEFT JOIN alarms a ON t.device_id = a.device_id
  WHERE throughput < 1000.0 AND a.code != 0;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO alarms (device_id, alarm_name, code) VALUES ('d1', 'CHANNEL_CREATE', 0);
INSERT INTO alarms (device_id, alarm_name, code) VALUES ('d2', 'CHANNEL_CREATE', 42);
INSERT INTO alarms (device_id, alarm_name, code) VALUES ('d3', 'CHANNEL_CREATE', 0);

INSERT INTO throughputs (device_id, throughput) VALUES ('d1', 2000.0);
INSERT INTO throughputs (device_id, throughput) VALUES ('d2', 900.0);
INSERT INTO throughputs (device_id, throughput) VALUES ('d3', 500.0);

To validate that this recipe is working, run the following query:

SELECT * FROM critical_issues emit changes limit 1;

Your output should resemble:

+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+
|T_DEVICE_ID                              |THROUGHPUT                               |ALARM_NAME                               |CODE                                     |
+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+
|d2                                       |900.0                                    |CHANNEL_CREATE                           |42                                       |
Limit Reached
Query terminated

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;