CREATE TABLE tenant_occupancy (
tenant_id VARCHAR PRIMARY KEY,
customer_id BIGINT
) WITH (
KAFKA_TOPIC = 'tenant-occupancy',
PARTITIONS = 6,
KEY_FORMAT = 'JSON',
VALUE_FORMAT = 'JSON'
);
Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud.
This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO
functionality to mock the data.
If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR
commands and insert mock data into the streams.
Our datacenter power analysis applications require data from two different sources: customer tenant information and smart control panel readings.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_mysqlcdc_customer_tenant WITH (
'connector.class' = 'MySqlCdcSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'database.hostname' = '<db-hostname>',
'database.port' = '3306',
'database.user' = '<db-user>',
'database.password' = '<db-password>',
'database.server.name' = 'mysql',
'database.whitelist' = 'customer',
'table.includelist' = 'customer.tenant',
'snapshot.mode' = 'initial',
'output.data.format' = 'JSON',
'tasks.max' = '1'
);
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_mqtt_smart_panel WITH (
'connector.class' = 'MqttSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'kafka.topic' = 'panel-readings',
'mqtt.server.uri' = 'tcp=//<mqtt-server-hostname>=1881',
'mqtt.topics' = '<mqtt-topic>',
'tasks.max' = '1'
);
SET 'auto.offset.reset' = 'earliest';
-- Create a Table for the captured tenant occupancy events
CREATE TABLE tenant_occupancy (
tenant_id VARCHAR PRIMARY KEY,
customer_id BIGINT
) WITH (
KAFKA_TOPIC = 'tenant-occupancy',
PARTITIONS = 6,
KEY_FORMAT = 'JSON',
VALUE_FORMAT = 'JSON'
);
-- Create a Stream for the power control panel telemetry data.
-- tenant_kwh_usage is reset by the device every month
CREATE STREAM panel_power_readings (
panel_id BIGINT,
tenant_id VARCHAR,
panel_current_utilization DOUBLE,
tenant_kwh_usage BIGINT
) WITH (
KAFKA_TOPIC = 'panel-readings',
PARTITIONS = 6,
KEY_FORMAT = 'JSON',
VALUE_FORMAT = 'JSON'
);
-- Create a filtered Stream of panel readings registering power usage >= 85%
-- good for determining panels which are drawing a high electrical load
CREATE STREAM overloaded_panels AS
SELECT panel_id, tenant_id, panel_current_utilization
FROM panel_power_readings
WHERE panel_current_utilization >= 0.85
EMIT CHANGES;
-- Create a stream of billable power events
-- the tenant_kwh_usage field is the aggregate amount of power used in the
-- current month
CREATE STREAM billable_power AS
SELECT
FORMAT_TIMESTAMP(FROM_UNIXTIME(panel_power_readings.ROWTIME), 'yyyy-MM')
AS billable_month,
tenant_occupancy.customer_id as customer_id,
tenant_occupancy.tenant_id as tenant_id,
panel_power_readings.tenant_kwh_usage as tenant_kwh_usage
FROM panel_power_readings
INNER JOIN tenant_occupancy ON
panel_power_readings.tenant_id = tenant_occupancy.tenant_id
EMIT CHANGES;
-- Create a table that can be queried for billing reports
CREATE TABLE billable_power_report WITH (KEY_FORMAT = 'JSON') AS
SELECT customer_id, tenant_id, billable_month, MAX(tenant_kwh_usage) as kwh
FROM billable_power
GROUP BY tenant_id, customer_id, billable_month;
If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO
statements to insert mock data into the source topics:
-- tenant_id is in the form of a resource name used to indicate the
-- data center provider, country, regional locale, and tenant id
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:eqix:us:chi1:12', 924);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:eqix:us:chi1:10', 243);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:eu:ber1:15', 924);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:eu:ber1:20', 123);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:cn:hnk2:11', 243);
-- power readings contain two distinct readings. The current total utilization of the
-- panel, and the monthly total wattage usage for the referenced tenant
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:eqix:us:chi1:12', 1.05, 1034);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:eqix:us:chi1:10', 0.85, 867);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:eu:ber1:15', 0.54, 345);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:kddi:eu:ber1:20', 0.67, 288);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:cn:hnk2:11', 1.11, 1119);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:eqix:us:chi1:12', 1.01, 1134);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:eqix:us:chi1:10', 0.75, 898);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:cn:hnk2:11', 1.10, 1201);
To validate that this recipe is working, run the following query:
SELECT * FROM billable_power_report WHERE tenant_id = 'dc:kddi:eu:ber1:15';
Your output should resemble:
+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+
|TENANT_ID |CUSTOMER_ID |BILLABLE_MONTH |KWH |
+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+
|dc:kddi:eu:ber1:15 |924 |2022-04 |345 |
Query terminated
Typically, customer information would be sourced from an existing database. As customer occupancy changes, tables in the database are updated and we can stream them into Kafka using Kafka Connect with change data capture. The earlier example of the MySqlCdcSource
configuration could be used to capture changes from a customer
database’s tenant
table into the Kafka cluster. This connector is provided as fully managed on Confluent Cloud.
Telemetry data may be sourced into Kafka in a variety of ways. MQTT is a popular source for the Internet of Things (IoT) devices, and smart electrical panels may provide this functionality out of the box. The MQTT Connector is available as fully managed on Confluent Cloud.
The current state of customer tenant occupancy can be represented with a ksqlDB TABLE
. Events streamed into the tenant-occupancy
topic represent a customer (customer_id
) beginning an occupancy of a particular tenant (tenant_id
). As events are observed on the tenant-occupancy
topic, the table will model the current set of tenant occupants.
CREATE TABLE tenant_occupancy (
tenant_id VARCHAR PRIMARY KEY,
customer_id BIGINT
) WITH (
KAFKA_TOPIC = 'tenant-occupancy',
PARTITIONS = 6,
KEY_FORMAT = 'JSON',
VALUE_FORMAT = 'JSON'
);
You can query this table to determine which customer occupies which tenant.
SELECT * FROM tenant_occupancy EMIT CHANGES;
When customers leave a tenant, the source system will need to send a tombstone record (an event with a valid tenant_id
key and a null
value). ksqlDB will process the tombstone by removing the row with the given key from the table.
Panel sensor readings can be streamed directly into a topic or sourced from an upstream system. A STREAM
captures the power readings when they flow from the smart panel into Kafka. Each event contains a panel identifier and the associated tenant, in addition to two power readings.
CREATE STREAM panel_power_readings (
panel_id BIGINT,
tenant_id VARCHAR,
panel_current_utilization DOUBLE,
tenant_kwh_usage BIGINT
) WITH (
KAFKA_TOPIC = 'panel-readings',
PARTITIONS = 6,
KEY_FORMAT = 'JSON',
VALUE_FORMAT = 'JSON'
);
panel_current_utilization
represents the percentage of total capacity of the panel and is useful for business continuation monitoring
tenant_kwh_usage
provides the total amount of energy consumed by the tenant in the current month
A simple example for determining when a panel is overloaded is provided by:
CREATE STREAM overloaded_panels AS
SELECT panel_id, tenant_id, panel_current_utilization
FROM panel_power_readings
WHERE panel_current_utilization >= 0.85
EMIT CHANGES;
This command filters the panel power readings for instances where utilization is 85% or higher. This stream could be used in a monitoring or alerting context to notify on-call personnel of a potential issue with the power supplies to the datacenter.
To provide billing reports, a STREAM
is created that joins the panel sensor readings with the customer tenant information. Functions are used to create a billable month indicator along with the necessary fields from the joined stream and table.
CREATE STREAM billable_power AS
SELECT
FORMAT_TIMESTAMP(FROM_UNIXTIME(panel_power_readings.ROWTIME), 'yyyy-MM')
AS billable_month,
tenant_occupancy.customer_id as customer_id,
tenant_occupancy.tenant_id as tenant_id,
panel_power_readings.tenant_kwh_usage as tenant_kwh_usage
FROM panel_power_readings
INNER JOIN tenant_occupancy ON
panel_power_readings.tenant_id = tenant_occupancy.tenant_id
EMIT CHANGES;
Finally, the billable_power_report
aggregates the billable_power
stream into a TABLE
that can be queried to create reports by month, customer, and tenant.
CREATE TABLE billable_power_report WITH (KEY_FORMAT = 'JSON') AS
SELECT customer_id, tenant_id, billable_month, MAX(tenant_kwh_usage) as kwh
FROM billable_power
GROUP BY tenant_id, customer_id, billable_month;
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;