Analyze datacenter power usage

Edit this page
For businesses that provide cloud infrastructure across multiple data centers with isolated tenants, you may have an accounting unit to accurately monitor and invoice your customers. Oftentimes these data centers consume large amounts of electricity and are constructed with smart electrical panels that control the power supplies to multiple customer tenants. This tutorial demonstrates how to accurately bill each customer by capturing and analyzing telemetry data from these smart panels.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

Our datacenter power analysis applications require data from two different sources: customer tenant information and smart control panel readings.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS customer WITH (
  'connector.class'       = 'MySqlCdcSource',
  'name'                  = 'Customer_Tenant_Source',
  'kafka.api.key'         = '<my-kafka-api-key>',
  'kafka.api.secret'      = '<my-kafka-api-secret>',
  'database.hostname'     = '<db-hostname>',
  'database.port'         = '3306',
  'database.user'         = '<db-user>',
  'database.password'     = '<db-password>',
  'database.server.name'  = 'mysql',
  'database.whitelist'    = 'customer',
  'table.includelist'     = 'customer.tenant',
  'snapshot.mode'         = 'initial',
  'output.data.format'    = 'JSON',
  'tasks.max'             = '1'
);

CREATE SOURCE CONNECTOR IF NOT EXISTS readings WITH (
  'connector.class'       = 'MqttSource',
  'name'                  = 'Smart_Panel_Source',
  'kafka.api.key'         = '<my-kafka-api-key>',
  'kafka.api.secret'      = '<my-kafka-api-secret>',
  'kafka.topic'           = 'panel-readings',
  'mqtt.server.uri'       = 'tcp=//<mqtt-server-hostname>=1881',
  'mqtt.topics'           = '<mqtt-topic>',
  'tasks.max'             = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create a Table for the captured tenant occupancy events
CREATE TABLE tenant_occupancy (
  tenant_id VARCHAR PRIMARY KEY,
  customer_id BIGINT
) WITH (
  KAFKA_TOPIC = 'tenant-occupancy',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);

-- Create a Stream for the power control panel telemetry data.
--   tenant_kwh_usage is reset by the device every month
CREATE STREAM panel_power_readings (
  panel_id BIGINT,
  tenant_id VARCHAR,
  panel_current_utilization DOUBLE,
  tenant_kwh_usage BIGINT
) WITH (
  KAFKA_TOPIC = 'panel-readings',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);

-- Create a filtered Stream of panel readings registering power usage >= 85%
--  good for determining panels which are drawing a high electrical load
CREATE STREAM overloaded_panels AS
  SELECT panel_id, tenant_id, panel_current_utilization
    FROM panel_power_readings
    WHERE panel_current_utilization >= 0.85
  EMIT CHANGES;

-- Create a stream of billable power events
--  the tenant_kwh_usage field is the aggregate amount of power used in the
--  current month
CREATE STREAM billable_power AS
  SELECT
      FORMAT_TIMESTAMP(FROM_UNIXTIME(panel_power_readings.ROWTIME), 'yyyy-MM')
        AS billable_month,
      tenant_occupancy.customer_id as customer_id,
      tenant_occupancy.tenant_id as tenant_id,
      panel_power_readings.tenant_kwh_usage as tenant_kwh_usage
    FROM panel_power_readings
    INNER JOIN tenant_occupancy ON
      panel_power_readings.tenant_id = tenant_occupancy.tenant_id
  EMIT CHANGES;

-- Create a table that can be queried for billing reports
CREATE TABLE billable_power_report WITH (KEY_FORMAT = 'JSON') AS
  SELECT customer_id, tenant_id, billable_month, MAX(tenant_kwh_usage) as kwh
    FROM billable_power
    GROUP BY tenant_id, customer_id, billable_month;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

-- tenant_id is in the form of a resource name used to indicate the
--  data center provider, country, regional locale, and tenant id
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:eqix:us:chi1:12', 924);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:eqix:us:chi1:10', 243);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:eu:ber1:15', 924);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:eu:ber1:20', 123);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:cn:hnk2:11', 243);

-- power readings contain two distinct readings. The current total utilization of the
--  panel, and the monthly total wattage usage for the referenced tenant
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:eqix:us:chi1:12', 1.05, 1034);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:eqix:us:chi1:10', 0.85, 867);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:eu:ber1:15', 0.54, 345);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:kddi:eu:ber1:20', 0.67, 288);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:cn:hnk2:11', 1.11, 1119);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:eqix:us:chi1:12', 1.01, 1134);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:eqix:us:chi1:10', 0.75, 898);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:cn:hnk2:11', 1.10, 1201);

To validate that this recipe is working, run the following query:

SELECT * FROM billable_power_report WHERE tenant_id = 'dc:kddi:eu:ber1:15';

Your output should resemble:

+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+
|TENANT_ID                                        |CUSTOMER_ID                                      |BILLABLE_MONTH                                   |KWH                                              |
+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+
|dc:kddi:eu:ber1:15                               |924                                              |2022-04                                          |345                                              |
Query terminated

Explanation

4

Typically, customer information would be sourced from an existing database. As customer occupancy changes, tables in the database are updated and we can stream them into Kafka using Kafka Connect with change data capture. The earlier example of the MySqlCdcSource configuration could be used to capture changes from a customer database’s tenant table into the Kafka cluster. This connector is provided as fully managed on Confluent Cloud.

Telemetry data may be sourced into Kafka in a variety of ways. MQTT is a popular source for the Internet of Things (IoT) devices, and smart electrical panels may provide this functionality out of the box. The MQTT Connector is available as fully managed on Confluent Cloud.

The current state of customer tenant occupancy can be represented with a ksqlDB TABLE. Events streamed into the tenant-occupancy topic represent a customer (customer_id) beginning an occupancy of a particular tenant (tenant_id). As events are observed on the tenant-occupancy topic, the table will model the current set of tenant occupants.

CREATE TABLE tenant_occupancy (
  tenant_id VARCHAR PRIMARY KEY,
  customer_id BIGINT
) WITH (
  KAFKA_TOPIC = 'tenant-occupancy',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);

You can query this table to determine which customer occupies which tenant.

SELECT * FROM tenant_occupancy EMIT CHANGES;

When customers leave a tenant, the source system will need to send a tombstone record (an event with a valid tenant_id key and a null value). ksqlDB will process the tombstone by removing the row with the given key from the table.

Panel sensor readings can be streamed directly into a topic or sourced from an upstream system. A STREAM captures the power readings when they flow from the smart panel into Kafka. Each event contains a panel identifier and the associated tenant, in addition to two power readings.

CREATE STREAM panel_power_readings (
  panel_id BIGINT,
  tenant_id VARCHAR,
  panel_current_utilization DOUBLE,
  tenant_kwh_usage BIGINT
) WITH (
  KAFKA_TOPIC = 'panel-readings',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);
  • panel_current_utilization represents the percentage of total capacity of the panel and is useful for business continuation monitoring

  • tenant_kwh_usage provides the total amount of energy consumed by the tenant in the current month

A simple example for determining when a panel is overloaded is provided by:

CREATE STREAM overloaded_panels AS
  SELECT panel_id, tenant_id, panel_current_utilization
    FROM panel_power_readings
    WHERE panel_current_utilization >= 0.85
  EMIT CHANGES;

This command filters the panel power readings for instances where utilization is 85% or higher. This stream could be used in a monitoring or alerting context to notify on-call personnel of a potential issue with the power supplies to the datacenter.

To provide billing reports, a STREAM is created that joins the panel sensor readings with the customer tenant information. Functions are used to create a billable month indicator along with the necessary fields from the joined stream and table.

CREATE STREAM billable_power AS
  SELECT
      FORMAT_TIMESTAMP(FROM_UNIXTIME(panel_power_readings.ROWTIME), 'yyyy-MM')
        AS billable_month,
      tenant_occupancy.customer_id as customer_id,
      tenant_occupancy.tenant_id as tenant_id,
      panel_power_readings.tenant_kwh_usage as tenant_kwh_usage
    FROM panel_power_readings
    INNER JOIN tenant_occupancy ON
      panel_power_readings.tenant_id = tenant_occupancy.tenant_id
  EMIT CHANGES;

Finally, the billable_power_report aggregates the billable_power stream into a TABLE that can be queried to create reports by month, customer, and tenant.

CREATE TABLE billable_power_report WITH (KEY_FORMAT = 'JSON') AS
  SELECT customer_id, tenant_id, billable_month, MAX(tenant_kwh_usage) as kwh
    FROM billable_power
    GROUP BY tenant_id, customer_id, billable_month;

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;