Optimize omni-channel inventory

Edit this page
Having an up-to-date, real-time view of inventory on every item is essential in today's online marketplaces. This helps businesses maintain the optimum level of inventory—not too much and not too little—so that they can meet customer demand while minimizing inventory holding costs. This tutorial demonstrates how to track and update inventory in real time, so you always have an up-to-date snapshot of your stock for both your customers and merchandising teams

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

For this tutorial, we are interested in knowing each event for an item that affects its quantity. This creates a stream of events, where each event results in the addition or removal of inventory.

Create a ksqlDB TABLE, which is a mutable, partitioned collection that models change over time and represents what is true as of now.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_inventory WITH (
  'connector.class'          = 'PostgresSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'inventory',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create stream of inventory
CREATE STREAM inventory_stream (
  id VARCHAR KEY,
  item VARCHAR,
  quantity INTEGER
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'inventory',
  PARTITIONS = 6
);

-- Create stateful table with up-to-date information of inventory availability
CREATE TABLE inventory_stream_table
    WITH (KAFKA_TOPIC = 'inventory_table') AS
  SELECT
    item,
    SUM(quantity) AS item_quantity
  FROM inventory_stream
  GROUP BY item
  EMIT CHANGES;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO inventory_stream (id, item, quantity) VALUES ('1', 'Apple Magic Mouse 2', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('2', 'iPhoneX', 25);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('3', 'MacBookPro13', 100);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('4', 'iPad4', 20);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('5', 'Apple Pencil', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('5', 'PhoneX', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('4', 'iPad4', -20);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('3', 'MacBookPro13', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('4', 'iPad4', 20);

To validate that this recipe is working, run the following query:

SELECT * FROM inventory_stream_table EMIT CHANGES LIMIT 5;

Your output should resemble:

+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ITEM                                                             |ITEM_QUANTITY                                                    |
+-----------------------------------------------------------------+-----------------------------------------------------------------+
|MacBookPro13                                                     |100                                                              |
|Apple Pencil                                                     |10                                                               |
|MacBookPro13                                                     |110                                                              |
|PhoneX                                                           |10                                                               |
|iPad4                                                            |20                                                               |
Limit Reached
Query terminated

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;