Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud.
This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO
functionality to mock the data.
If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR
commands and insert mock data into the streams.
This application processes real-time vehicle information, showing the current state of the order on the vehicle – WAREHOUSE
, EN ROUTE
, or DELIVERED
– along with the distance away from the final destination and an estimated delivery time.
For this tutorial, we’ll be using order data as well as a stream of status updates from a fleet of delivery vehicles that are in the process of transporting orders from a warehouse to customers. Kafka Connect can easily stream in data from a database containing that information; use the following template as a guide for setting up your connector.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_logistics_orders WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'orders',
'timestamp.column.name' = 'timestamp',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_logistics_fleet WITH (
'connector.class' = 'PostgresSource',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'connection.host' = '<database-host>',
'connection.port' = '5432',
'connection.user' = 'postgres',
'connection.password' = '<database-password>',
'db.name' = '<db-name>',
'table.whitelist' = 'fleet_updates',
'timestamp.column.name' = 'timestamp',
'output.data.format' = 'JSON',
'db.timezone' = 'UTC',
'tasks.max' = '1'
);
SET 'auto.offset.reset' = 'earliest';
-- Create Table of Orders
CREATE TABLE ORDERS (
ID VARCHAR PRIMARY KEY,
NAME VARCHAR,
EMAIL VARCHAR,
ADDRESS VARCHAR,
DEST_LAT DOUBLE,
DEST_LONG DOUBLE,
ORDER_TOTAL DOUBLE
) WITH (
KAFKA_TOPIC = 'orders',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
);
-- Create Vehicle Stream
CREATE STREAM VEHICLES (
ID VARCHAR KEY,
ORDER_ID VARCHAR,
STATE VARCHAR,
LAT DOUBLE,
LONG DOUBLE,
WAREHOUSE_LAT DOUBLE,
WAREHOUSE_LONG DOUBLE,
TEMPERATURE DOUBLE,
TIRE_PRESSURE DOUBLE
) WITH (
KAFKA_TOPIC = 'fleet_updates',
VALUE_FORMAT = 'JSON',
KEY_FORMAT = 'KAFKA',
PARTITIONS = 6
);
-- Create order tracking table to show the eta based on vehicle location.
CREATE TABLE ORDER_TRACKER WITH (
KAFKA_TOPIC = 'order_tracker',
PARTITIONS = 6
) AS
SELECT
O.ID ORDER_ID,
LATEST_BY_OFFSET(V.ID) VEHICLE_ID,
LATEST_BY_OFFSET(V.LAT) LAT,
LATEST_BY_OFFSET(V.LONG) LONG,
LATEST_BY_OFFSET(O.DEST_LAT) DEST_LAT,
LATEST_BY_OFFSET(O.DEST_LONG) DEST_LONG,
LATEST_BY_OFFSET(ROUND(GEO_DISTANCE(CAST(V.LAT as DOUBLE), CAST(V.LONG as DOUBLE), CAST(O.DEST_LAT as DOUBLE), CAST(O.DEST_LONG as DOUBLE), 'KM'), 2)) DISTANCE_FROM_DESTINATION,
LATEST_BY_OFFSET(ROUND(GREATEST(ABS(V.LAT - O.DEST_LAT), ABS(V.LONG - O.DEST_LONG)) / (0.5 / 10 / 10) * 2, 2)) ETA_SECONDS
FROM VEHICLES AS V
JOIN ORDERS AS O
ON ((V.ORDER_ID = O.ID))
GROUP BY O.ID
EMIT CHANGES;
If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO
statements to insert mock data into the source topics:
-- Orders
INSERT INTO orders (ID, NAME, EMAIL, ADDRESS, DEST_LAT, DEST_LONG, ORDER_TOTAL) VALUES ('67', 'Penelope Coin', 'pennycoin@email.com', '183 Maple Drive', 37.596484, -122.488516, 147.52);
INSERT INTO orders (ID, NAME, EMAIL, ADDRESS, DEST_LAT, DEST_LONG, ORDER_TOTAL) VALUES ('93', 'Theodore Bear', 'teddyb@email.com', '68 El Camino Real', 37.531003, -122.400401, 74.97);
INSERT INTO orders (ID, NAME, EMAIL, ADDRESS, DEST_LAT, DEST_LONG, ORDER_TOTAL) VALUES ('184', 'Jack Pepper', 'pepper.jack@email.com', '8299 Skyline Drive', 37.523939, -122.285065, 385.01);
-- Vehicles
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'WAREHOUSE', 37.509757, -122.263389, 37.509757, -122.263389, 197.3, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'EN ROUTE', 37.541382, -122.284036, 37.509757, -122.263389, 197.3, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'EN ROUTE', 37.551639, -122.290937, 37.509757, -122.263389, 197.3, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'EN ROUTE', 37.543293, -122.301903, 37.509757, -122.263389, 197.2, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'EN ROUTE', 37.538930, -122.301108, 37.509757, -122.263389, 198.3, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'EN ROUTE', 37.534920, -122.297403, 37.509757, -122.263389, 198.2, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'EN ROUTE', 37.529016, -122.290019, 37.509757, -122.263389, 198.3, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('0', '184', 'DELIVERED', 37.523939, -122.285065, 37.509757, -122.263389, 197.8, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('1', '67', 'EN ROUTE', 37.455590, -122.275563, 37.509757, -122.263389, 196.3, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('1', '67', 'EN ROUTE', 37.471054, -122.322917, 37.509757, -122.263389, 196.7, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('1', '67', 'EN ROUTE', 37.500196, -122.379183, 37.509757, -122.263389, 196.5, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('1', '67', 'EN ROUTE', 37.531003, -122.400401, 37.509757, -122.263389, 196.6, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('1', '67', 'EN ROUTE', 37.560702, -122.458749, 37.509757, -122.263389, 196.4, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('1', '67', 'DELIVERED', 37.596484, -122.488098, 37.509757, -122.263389, 196.9, 34.8);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('2', '93', 'EN ROUTE', 36.855590, -121.575563, 37.509757, -122.263389, 198.7, 34.9);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('2', '93', 'EN ROUTE', 36.871054, -122.722917, 37.509757, -122.263389, 198.9, 34.9);
INSERT INTO vehicles (ID, ORDER_ID, STATE, LAT, LONG, WAREHOUSE_LAT, WAREHOUSE_LONG, TEMPERATURE, TIRE_PRESSURE) VALUES ('2', '93', 'EN ROUTE', 36.900196, -122.979183, 37.509757, -122.263389, 198.7, 34.9);
To validate that this recipe is working, run the following query:
SELECT * FROM order_tracker;
Your output should resemble:
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|ORDER_ID |VEHICLE_ID |LAT |LONG |DEST_LAT |DEST_LONG |DISTANCE_FROM_|ETA_SECONDS |
| | | | | | |DESTINATION | |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|93 |2 |36.900196 |-122.979183 |37.531003 |-122.400401 |86.87 |252.32 |
|67 |1 |37.596484 |-122.488098 |37.596484 |-122.488516 |0.04 |0.17 |
|184 |0 |37.523939 |-122.285065 |37.523939 |-122.285065 |0.0 |0.0 |
Query terminated
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;