Notify passengers of flight updates

Edit this page
Worse than having a flight delayed is not being notified about the important changes that come with it, such as new boarding times, cancellations, gate changes, and estimated arrivals. This tutorial shows how ksqlDB can help airlines combine passenger, flight booking, and current flight plan data to immediately alert a passenger about flight updates in real time. You can also meet the requirements for different use cases at the same time, for example you can export the impacted flight information to the analytics team for further processing and send the flight information to a AWS Lambda function to notify the customer. Additionally, if you're interested in extending this recipe to include an open source flight dataset, you can use real air traffic surveillance data from OpenSky Network, a non-profit community-based receiver network.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

This ksqlDB application pulls in data from different tables for customers, flights, flight updates, and bookings, joins customer flight booking data and any flight updates, and provides a stream of notifications to passengers.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_aviation WITH (
  'connector.class'          = 'PostgresSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'customers,flights,flight_updates,bookings',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE customers (
  id INT PRIMARY KEY,
  name VARCHAR,
  address VARCHAR,
  email VARCHAR,
  phone VARCHAR,
  loyalty_status VARCHAR,
  loyalty_id VARCHAR
) WITH (
  KAFKA_TOPIC = 'customers',
  FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE TABLE flights (
  id INT PRIMARY KEY,
  origin VARCHAR,
  destination VARCHAR,
  code VARCHAR,
  scheduled_dep TIMESTAMP,
  scheduled_arr TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'flights',
  FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE TABLE bookings (
  id INT PRIMARY KEY,
  customer_id INT,
  flight_id INT
) WITH (
  KAFKA_TOPIC = 'bookings',
  FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE TABLE customer_bookings WITH (KAFKA_TOPIC = 'customer_bookings', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS
  SELECT C.*,
         B.id,
         B.flight_id
  FROM bookings B
  INNER JOIN customers C
  ON B.customer_id = C.id;

CREATE TABLE customer_flights WITH (KAFKA_TOPIC = 'customer_flights', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS
  SELECT CB.*,
         F.*
  FROM customer_bookings CB
  INNER JOIN flights F
  ON CB.flight_id = F.id;

-- In preparation for joining customer flights with flight updates, need to first
-- rekey the customer_flights table by flight ID, which is currently a multi-step
-- process
CREATE STREAM cf_stream (
  cb_c_id INTEGER,
  cb_c_name VARCHAR,
  cb_c_address VARCHAR,
  cb_c_email VARCHAR,
  cb_c_phone VARCHAR,
  cb_c_loyalty_status VARCHAR,
  cb_c_loyalty_id VARCHAR,
  cb_flight_id INTEGER,
  f_id INTEGER,
  f_origin VARCHAR,
  f_destination VARCHAR,
  f_code VARCHAR,
  f_scheduled_dep TIMESTAMP,
  f_scheduled_arr TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'customer_flights',
  KEY_FORMAT = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);

CREATE STREAM cf_rekey_masked WITH (KAFKA_TOPIC = 'cf_rekey_masked') AS
  SELECT f_id           AS flight_id,
    cb_c_id             AS customer_id,
    cb_c_name           AS customer_name,
    cb_c_address        AS customer_address,
    cb_c_email          AS customer_email,
    cb_c_phone          AS customer_phone,
    cb_c_loyalty_status AS customer_loyalty_status,
    MASK_KEEP_RIGHT(cb_c_loyalty_id,3) AS customer_loyalty_id,
    f_origin            AS flight_origin,
    f_destination       AS flight_destination,
    f_code              AS flight_code,
    f_scheduled_dep     AS flight_scheduled_dep,
    f_scheduled_arr     AS flight_scheduled_arr
  FROM cf_stream
  PARTITION BY f_id;

CREATE TABLE customer_flights_rekeyed (
  flight_id INT PRIMARY KEY,
  customer_id VARCHAR,
  customer_name VARCHAR,
  customer_address VARCHAR,
  customer_email VARCHAR,
  customer_phone VARCHAR,
  customer_loyalty_status VARCHAR,
  customer_loyalty_id VARCHAR,
  flight_origin VARCHAR,
  flight_destination VARCHAR,
  flight_code VARCHAR,
  flight_scheduled_dep TIMESTAMP,
  flight_scheduled_arr TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'cf_rekey_masked',
  KEY_FORMAT = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);

CREATE STREAM flight_updates (
  id INT KEY,
  flight_id INT,
  updated_dep TIMESTAMP,
  reason VARCHAR
) WITH (
  KAFKA_TOPIC = 'flight_updates',
  KEY_FORMAT = 'KAFKA',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE STREAM customer_flight_updates WITH (KAFKA_TOPIC = 'customer_flight_updates') AS
SELECT CB.flight_id,
  customer_name,
  FU.reason AS flight_change_reason,
  FU.updated_dep AS flight_updated_dep,
  flight_scheduled_dep,
  customer_email,
  customer_phone,
  customer_loyalty_id,
  flight_destination,
  flight_code
FROM flight_updates FU
  INNER JOIN customer_flights_rekeyed CB
  ON FU.flight_id = CB.flight_id
EMIT CHANGES;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

-- For the purposes of this recipe when testing by inserting records manually,
--  a short pause between these insert groups is required. This allows
--  the flight data to be processed by the customer_flights_rekeyed
--  table before the JOIN with the flight updates data

INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver', '2PLNX338063');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver', '7AWLM918339');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold', '3RNZH870911');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver', '5BWEP418137');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze', '4MNJB877136');

INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (1, 'LBA', 'AMS', '642',  '2021-11-18T06:04:00', '2021-11-18T06:48:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (4, 'AMS', 'OSL', '496',  '2021-11-18T11:20:00', '2021-11-18T13:25:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00');

INSERT INTO bookings (id, customer_id, flight_id) VALUES (1,2,1);
INSERT INTO bookings (id, customer_id, flight_id) VALUES (2,1,1);
INSERT INTO bookings (id, customer_id, flight_id) VALUES (3,5,3);
INSERT INTO bookings (id, customer_id, flight_id) VALUES (4,4,2);

-- Wait 10 seconds before inserting the records below

INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable');
INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks');
INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions');

To validate that this recipe is working, run the following query:

SELECT * FROM customer_flight_updates EMIT CHANGES LIMIT 3;

Your output should resemble:

+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|CB_FLIGHT_ID       |CUSTOMER_NAME      |FLIGHT_CHANGE_REASO|FLIGHT_UPDATED_DEP |FLIGHT_SCHEDULED_DE|CUSTOMER_EMAIL     |CUSTOMER_PHONE     |CUSTOMER_LOYALTY_ID|FLIGHT_DESTINATION |FLIGHT_CODE        |
|                   |                   |N                  |                   |P                  |                   |                   |                   |                   |                   |
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|3                  |Arline Synnott     |Mechanical checks  |2021-11-19T14:00:00|2021-11-18T08:11:00|asynnott4@theatlant|+62 953 759 8885   |nXXXXnnn136        |TXL                |7968               |
|                   |                   |                   |.000               |.000               |ic.com             |                   |                   |                   |                   |
|2                  |Ker Omond          |Cabin staff unavail|2021-11-18T09:00:00|2021-11-18T07:36:00|komond3@usnews.com |+33 515 323 0170   |nXXXXnnn137        |LHR                |9607               |
|                   |                   |able               |.000               |.000               |                   |                   |                   |                   |                   |
|1                  |Gilly Crocombe     |Icy conditions     |2021-11-19T08:10:09|2021-11-18T06:04:00|gcrocombe1@homestea|+33 203 565 3736   |nXXXXnnn339        |AMS                |642                |
|                   |                   |                   |.000               |.000               |d.com              |                   |                   |                   |                   |
Limit Reached
Query terminated

Write the data out

4

Any downstream application or database can receive the post-processed data. This example demonstrates exporting data using the Snowflake Sink Connector for Confluent Cloud.

-- Send data to Snowflake
CREATE SINK CONNECTOR IF NOT EXISTS recipe_snowflake_aviation WITH (
  'connector.class'          = 'SnowflakeSink',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'customer_flight_updates',
  'input.data.format'        = 'JSON',
  'snowflake.url.name'       = 'https=//wm83168.us-central1.gcp.snowflakecomputing.com=443',
  'snowflake.user.name'      = '<login-username>',
  'snowflake.private.key'    = '<private-key>',
  'snowflake.database.name'  = '<database-name>',
  'snowflake.schema.name'    = '<schema-name>',
  'tasks.max'                = '1'
);

Explanation

5

Create and populate the underlying tables

ksqlDB supports tables and streams as objects. Both are backed by Kafka topics. Here we’re going to create three tables in a normalized data model to hold information about our customers, their bookings, and the flights.

First off, let’s create a table that will hold data about our customers:

CREATE TABLE customers (
  id INT PRIMARY KEY,
  name           VARCHAR,
  address        VARCHAR,
  email          VARCHAR,
  phone          VARCHAR,
  loyalty_status VARCHAR,
  loyalty_id     VARCHAR
) WITH (
  KAFKA_TOPIC = 'customers',
  FORMAT = 'JSON',
  PARTITIONS = 6
);

This will store the data in a Kafka topic. In practice, you would probably populate this directly from your application or a feed from your database using Kafka Connect. For simplicity, here we’ll just load some data directly:

INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver', '2PLNX338063');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver', '7AWLM918339');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold', '3RNZH870911');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver', '5BWEP418137');
INSERT INTO customers (id, name, address, email, phone, loyalty_status, loyalty_id) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze', '4MNJB877136');

Next, we’ll create a table of flights and associated bookings for our customers.

CREATE TABLE flights (
  id INT PRIMARY KEY,
  origin VARCHAR,
  destination VARCHAR,
  code VARCHAR,
  scheduled_dep TIMESTAMP,
  scheduled_arr TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'flights',
  FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE TABLE bookings (
  id INT PRIMARY KEY,
  customer_id INT,
  flight_id INT
) WITH (
  KAFKA_TOPIC = 'bookings',
  FORMAT = 'JSON',
  PARTITIONS = 6
);

For these two tables, let’s add some data. As before, this would usually come directly from your application or a stream of data from another system integrated through Kafka Connect.

INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (1, 'LBA', 'AMS', '642',  '2021-11-18T06:04:00', '2021-11-18T06:48:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (4, 'AMS', 'OSL', '496',  '2021-11-18T11:20:00', '2021-11-18T13:25:00');
INSERT INTO flights (id, origin, destination, code, scheduled_dep, scheduled_arr) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00');

INSERT INTO bookings (id, customer_id, flight_id) VALUES (1,2,1);
INSERT INTO bookings (id, customer_id, flight_id) VALUES (2,1,1);
INSERT INTO bookings (id, customer_id, flight_id) VALUES (3,5,3);
INSERT INTO bookings (id, customer_id, flight_id) VALUES (4,4,2);

Denormalize the data

To give us a single view of the passenger/flight data, we’ll denormalize down the three tables into one. First, we join the customers to bookings that they’ve made and build a new table as a result:


SET 'auto.offset.reset' = 'earliest';

CREATE TABLE customer_bookings WITH (KAFKA_TOPIC = 'customer_bookings', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS
  SELECT C.*,
         B.id,
         B.flight_id
  FROM bookings B
  INNER JOIN customers C
  ON B.customer_id = C.id;

From here, we join to details of the flights themselves:


SET 'auto.offset.reset' = 'earliest';

CREATE TABLE customer_flights WITH (KAFKA_TOPIC = 'customer_flights', KEY_FORMAT = 'KAFKA', VALUE_FORMAT = 'JSON') AS
  SELECT CB.*,
         F.*
  FROM customer_bookings CB
  INNER JOIN flights F
  ON CB.flight_id = F.id;

At this stage, we can query the data held in the tables to show which customers are booked on which flights:

SET 'auto.offset.reset' = 'earliest';
SELECT cb_c_name           AS name,
       cb_c_email          AS email,
       cb_c_loyalty_status AS loyalty_status,
       cb_c_loyalty_id     AS loyalty_id,
       f_origin            AS origin,
       f_destination       AS destination,
       f_code              AS code,
       f_scheduled_dep     AS scheduled_dep
FROM customer_flights
EMIT CHANGES;
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|NAME                     |EMAIL                    |LOYALTY_STATUS           |LOYALTY_ID               |ORIGIN                   |DESTINATION              |CODE                     |SCHEDULED_DEP            |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|Arline Synnott           |asynnott4@theatlantic.com|Bronze                   |4MNJB877136              |AMS                      |TXL                      |7968                     |2021-11-18T08:11:00.000  |
|Gleda Lealle             |glealle0@senate.gov      |Silver                   |2PLNX338063              |LBA                      |AMS                      |642                      |2021-11-18T06:04:00.000  |
|Ker Omond                |komond3@usnews.com       |Silver                   |5BWEP418137              |LBA                      |LHR                      |9607                     |2021-11-18T07:36:00.000  |
|Gilly Crocombe           |gcrocombe1@homestead.com |Silver                   |7AWLM918339              |LBA                      |AMS                      |642                      |2021-11-18T06:04:00.000  |

The last step in denormalizing the data is to set the key of the customer flights table to that of the flight ID so that it can be joined to the flight updates (which we’ll get to below). This is currently a multi-step process, see details. One additional transformation during the repartitioning is the partial masking of the customer loyalty ID to protect sensitive data.


SET 'auto.offset.reset' = 'earliest';

CREATE STREAM cf_stream (
  cb_c_id INTEGER,
  cb_c_name VARCHAR,
  cb_c_address VARCHAR,
  cb_c_email VARCHAR,
  cb_c_phone VARCHAR,
  cb_c_loyalty_status VARCHAR,
  cb_c_loyalty_id VARCHAR,
  cb_flight_id INTEGER,
  f_id INTEGER,
  f_origin VARCHAR,
  f_destination VARCHAR,
  f_code VARCHAR,
  f_scheduled_dep TIMESTAMP,
  f_scheduled_arr TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'customer_flights',
  KEY_FORMAT = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);

CREATE STREAM cf_rekey_masked WITH (KAFKA_TOPIC = 'cf_rekey_masked') AS
  SELECT f_id           AS flight_id,
    cb_c_id             AS customer_id,
    cb_c_name           AS customer_name,
    cb_c_address        AS customer_address,
    cb_c_email          AS customer_email,
    cb_c_phone          AS customer_phone,
    cb_c_loyalty_status AS customer_loyalty_status,
    MASK_KEEP_RIGHT(cb_c_loyalty_id,3) AS customer_loyalty_id,
    f_origin            AS flight_origin,
    f_destination       AS flight_destination,
    f_code              AS flight_code,
    f_scheduled_dep     AS flight_scheduled_dep,
    f_scheduled_arr     AS flight_scheduled_arr
  FROM cf_stream
  PARTITION BY f_id;

CREATE TABLE customer_flights_rekeyed (
  flight_id INT PRIMARY KEY,
  customer_id VARCHAR,
  customer_name VARCHAR,
  customer_address VARCHAR,
  customer_email VARCHAR,
  customer_phone VARCHAR,
  customer_loyalty_status VARCHAR,
  customer_loyalty_id VARCHAR,
  flight_origin VARCHAR,
  flight_destination VARCHAR,
  flight_code VARCHAR,
  flight_scheduled_dep TIMESTAMP,
  flight_scheduled_arr TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'cf_rekey_masked',
  KEY_FORMAT = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);

We now have the customer flights table as before, but keyed on flight_id.

Add a stream of flight updates

In the flights table above, we have the scheduled departure time of a flight (SCHEDULED_DEP). Now, let’s introduce a stream of events that any flight changes will be written to. Again, we’re populating it directly, but in the real world it’ll be coming from somewhere else—perhaps Kafka Connect pulling the data from a JMS queue (or any of the other hundreds of supported sources).

CREATE STREAM flight_updates (
  id INT KEY,
  flight_id INT,
  updated_dep TIMESTAMP,
  reason VARCHAR
) WITH (
  KAFKA_TOPIC = 'flight_updates',
  FORMAT = 'JSON',
  PARTITIONS = 6
);

Join data

By joining between our customer flight booking data and any flight updates, we can provide a stream of notifications to passengers. Many platforms exist for providing the push notification, whether bespoke in app or using a third-party messaging tool. ksqlDB can integrate with these using its REST interface, native Java client, or one of the several community-supported clients.

In one ksqlDB window, run the following ksqlDB query to return customer details with flight updates. This is the same query that you would run from your application, and it runs continuously.

SELECT  customer_name,
      FU.reason AS flight_change_reason,
      FU.updated_dep AS flight_updated_dep,
      flight_scheduled_dep,
      customer_loyalty_id,
      customer_email,
      customer_phone,
      flight_destination,
      flight_code
  FROM flight_updates FU
        INNER JOIN customer_flights_rekeyed CB
        ON FU.flight_id = CB.flight_id
EMIT CHANGES;

In another ksqlDB window, add some data to the flight update stream:

INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable');
INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks');
INSERT INTO flight_updates (id, flight_id, updated_dep, reason) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions');

In the original window, you will see the details of which passengers are impacted by which flight changes, as shown below. Notice that the customer loyalty IDs are partially masked.

+---------------+------------------------+--------------------+----------------------+---------------------+---------------------------+------------------+-------------------+------------+
|customer_name  |flight_change_reason    |flight_updated_dep  |flight_scheduled_dep  |customer_loyalty_id  |customer_email             |customer_phone    |flight_destination |flight_code |
+---------------+------------------------+--------------------+----------------------+---------------------+---------------------------+------------------+-------------------+------------+
|Gleda Lealle   |Icy conditions          |2021-11-19T08:10:09 |2021-11-18T06:04:00   |nXXXXnnn063          |glealle0@senate.gov        |+351 831 301 6746 |AMS                |642         |
|Ker Omond      |Cabin staff unavailable |2021-11-18T09:00:00 |2021-11-18T07:36:00   |nXXXXnnn137          |komond3@usnews.com         |+33 515 323 0170  |LHR                |9607        |
|Arline Synnott |Mechanical checks       |2021-11-19T14:00:00 |2021-11-18T08:11:00   |nXXXXnnn136          |asynnott4@theatlantic.com  |+62 953 759 8885  |TXL                |7968        |

Cleanup

6

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;