Optimize fleet management

Edit this page
More and more, fleet management relies on knowing real-time information on vehicle availability, their locations, and integrating that with data from vehicle telematics. This enables businesses to improve operational efficiency by optimizing travel routes, lowering fuel consumption, and automating service schedules. This tutorial combines fleet locations with individual vehicle information, so organizations can have a real-time consolidated view of their entire fleet.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

This application will enrich the fleet telemetry events with details about the associated vehicle.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

-- Stream of fleet descriptions
CREATE SOURCE CONNECTOR IF NOT EXISTS fleet_description WITH (
  'connector.class'          = 'MongoDbAtlasSource',
  'name'                     = 'recipe-mongodb-fleet_description',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host-address>',
  'connection.user'          = '<database-username>',
  'connection.password'      = '<database-password>',
  'database'                 = '<database-name>',
  'collection'               = '<database-collection-name>',
  'poll.await.time.ms'       = '5000',
  'poll.max.batch.size'      = '1000',
  'copy.existing'            = 'true',
  'output.data.format'       = 'JSON',
  'tasks.max'                = '1'
);

-- Stream of current location of each vehicle in the fleet
CREATE SOURCE CONNECTOR IF NOT EXISTS fleet_location WITH (
  'connector.class'          = 'PostgresSource',
  'name'                     = 'recipe-postgres-fleet_location',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'fleet_location',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- create stream of locations
CREATE STREAM locations (
  vehicle_id INT,
  latitude DOUBLE,
  longitude DOUBLE,
  timestamp VARCHAR
) WITH (
  KAFKA_TOPIC = 'locations',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- fleet lookup table
CREATE TABLE fleet (
  vehicle_id INT PRIMARY KEY,
  driver_id INT,
  license BIGINT
) WITH (
  KAFKA_TOPIC = 'descriptions',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- enrich fleet location stream with more fleet information
CREATE STREAM fleet_location_enhanced AS
  SELECT
    l.vehicle_id,
    latitude,
    longitude,
    timestamp,
    f.driver_id,
    f.license
  FROM locations l
  LEFT JOIN fleet f ON l.vehicle_id = f.vehicle_id;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO locations (vehicle_id, latitude, longitude, timestamp) VALUES (5401, 16.7587537, 96.2482149, '2021-09-23T10:50:00.000Z');
INSERT INTO locations (vehicle_id, latitude, longitude, timestamp) VALUES (5401, 16.004175, 120.7806412, '2021-09-27T06:39:00.000Z');
INSERT INTO locations (vehicle_id, latitude, longitude, timestamp) VALUES (5402, 32.755613, 22.6377432, '2021-09-25T20:22:00.000Z');

INSERT INTO fleet (vehicle_id, driver_id, license) VALUES (5401, 847383, 8852693196);
INSERT INTO fleet (vehicle_id, driver_id, license) VALUES (5402, 922947, 1255144201);
INSERT INTO fleet (vehicle_id, driver_id, license) VALUES (5403, 435309, 2132311746);

To validate that this recipe is working, run the following query:

SELECT * FROM fleet_location_enhanced EMIT CHANGES LIMIT 3;

Your output should resemble:

+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|L_VEHICLE_ID                    |LATITUDE                        |LONGITUDE                       |TIMESTAMP                       |DRIVER_ID                       |LICENSE                         |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|5402                            |32.755613                       |22.6377432                      |2021-09-25T20:22:00.000Z        |null                            |null                            |
|5401                            |16.7587537                      |96.2482149                      |2021-09-23T10:50:00.000Z        |null                            |null                            |
|5401                            |16.004175                       |120.7806412                     |2021-09-27T06:39:00.000Z        |null                            |null                            |
Limit Reached
Query terminated

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;