Enrich orders with change data capture (CDC)

Edit this page
Change data capture (CDC) plays a vital role to ensure recently changed data is quickly ingested, transformed, and used by downstream analytics platforms and applications. If you have transactional events being written to a database, such as sales orders from a marketplace, you can use CDC to capture and denormalize these change events into a single table of enriched data to provide better query performance and consumption. This tutorial demonstrates this principle by streaming data from a SQL Server, denormalizing the data, and writing it to Snowflake.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

This streams the sales orders and denormalizes the data by joining facts (sales_orders) with the dimension (customer).

Change data capture (CDC) for orders is being read from a SQL Server database, and the customer data is being read from Oracle.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

-- Stream of sales_orders
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_sqlservercdc_orders WITH (
  'connector.class'          = 'SqlServerCdcSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'database.hostname'        = '<db-name>',
  'database.port'            = '1433',
  'database.user'            = '<database-username>',
  'database.password'        = '<database-password>',
  'database.dbname'          = 'database-name',
  'database.server.name'     = 'sql',
  'table.include.list'       = '<table_name>',
  'snapshot.mode'            = 'initial',
  'output.data.format'       = 'JSON',
  'tasks.max'                = '1'
);

-- Stream of customers
CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_oracle_customers WITH (
  'connector.class'          = 'OracleDatabaseSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topic.prefix'             = 'oracle_',
  'connection.host'          = '<database-host>',
  'connection.port'          = '1521',
  'connection.user'          = '<database-username>',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'CUSTOMERS',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UCT',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create stream of sales_orders
CREATE STREAM sales_orders (
  order_id BIGINT,
  customer_id BIGINT,
  item VARCHAR,
  order_total_usd DECIMAL(10,2)
) WITH (
  KAFKA_TOPIC = 'sales_orders',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Register the customer data topic as a table
CREATE TABLE customers (
  id BIGINT PRIMARY KEY,
  first_name VARCHAR,
  last_name VARCHAR,
  email VARCHAR
) WITH (
  KAFKA_TOPIC = 'customers',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Denormalize data, joining facts (sales_orders) with the dimension (customer)
CREATE STREAM sales_orders_enriched WITH (
  KAFKA_TOPIC = 'sales_orders_enriched'
) AS
  SELECT
    c.id AS customer_id,
    o.order_id AS order_id,
    o.item AS item,
    o.order_total_usd AS order_total_usd,
    CONCAT(CONCAT(c.first_name , ' ') , c.last_name) AS full_name,
    c.email AS email
  FROM sales_orders o
    LEFT JOIN customers c
    ON o.customer_id = c.id;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO customers (id, first_name, last_name, email) VALUES (375, 'Janice', 'Smith', 'jsmith@mycompany.com');
INSERT INTO customers (id, first_name, last_name, email) VALUES (983, 'George', 'Mall', 'gmall@mycompany.com');

-- Wait 10 seconds before inserting the records below

INSERT INTO sales_orders (order_id, customer_id, item, order_total_usd) VALUES (44697328, 375, 'book', 29.99);
INSERT INTO sales_orders (order_id, customer_id, item, order_total_usd) VALUES (44697329, 375, 'guitar', 215.99);
INSERT INTO sales_orders (order_id, customer_id, item, order_total_usd) VALUES (44697330, 983, 'thermometer', 12.99);
INSERT INTO sales_orders (order_id, customer_id, item, order_total_usd) VALUES (44697331, 983, 'scarf', 32.99);
INSERT INTO sales_orders (order_id, customer_id, item, order_total_usd) VALUES (44697332, 375, 'doormat', 15.99);
INSERT INTO sales_orders (order_id, customer_id, item, order_total_usd) VALUES (44697333, 983, 'clippers', 65.99);

To validate that this recipe is working, run the following query:

SELECT * FROM sales_orders_enriched WHERE item = 'book';

Your output should resemble:

+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|CUSTOMER_ID              |ORDER_ID                 |ITEM                     |ORDER_TOTAL_USD          |FULL_NAME                |EMAIL                    |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|375                      |44697328                 |book                     |29.99                    |Janice Smith             |jsmith@mycompany.com     |
Query terminated

Write the data out

4

Any downstream application or database can receive the denormalized data.

-- Send data to Snowflake
CREATE SINK CONNECTOR IF NOT EXISTS recipe_snowflake_analyzed_clickstream WITH (
  'connector.class'          = 'SnowflakeSink',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'sales_orders_enriched',
  'input.data.format'        = 'JSON',
  'snowflake.url.name'       = 'https=//wm83168.us-central1.gcp.snowflakecomputing.com=443',
  'snowflake.user.name'      = '<login-username>',
  'snowflake.private.key'    = '<private-key>',
  'snowflake.database.name'  = '<database-name>',
  'snowflake.schema.name'    = '<schema-name>',
  'tasks.max'                = '1'
);

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;