SELECT FIRST_NAME,
LAST_NAME,
CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF (1)
FROM PURCHASE_STREAM
EMIT CHANGES
LIMIT 4;
How can you calculate the difference between two columns?
Take the fields you want to calculate the difference between and use the -
operator between them field_1 - field_2
SELECT FIRST_NAME,
LAST_NAME,
CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF
Note that the -
operator expects numerical values. So if have columns where the numbers are stored as VARCHAR
you’ll have to use a CAST
operation to convert them to a numerical type, otherwise you’ll get an error in your query.
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir column-difference && cd column-difference
Then make the following directories to set up its structure:
mkdir src test
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
ksqldb-server:
image: confluentinc/ksqldb-server:0.28.2
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksqldb/log4j.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:9092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: "earliest"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.28.2
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
volumes:
- ./src:/opt/app/src
- ./test:/opt/app/test
And launch it by running:
docker compose up -d
To begin developing interactively, open up the ksqlDB CLI:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
The first thing we do is to create a stream named PURCHASE_STREAM
. This statement creates the customer_purchases
topic, since it doesn’t already exist. For more details check out the ksqlDB documentation on the CREATE STREAM statement. The data contained in the topic is just plain, schemaless JSON.
CREATE STREAM PURCHASE_STREAM (
ID VARCHAR,
PREVIOUS_PURCHASE DOUBLE,
CURRENT_PURCHASE DOUBLE,
TXN_TS VARCHAR,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR)
WITH (KAFKA_TOPIC='customer_purchases',
VALUE_FORMAT='JSON',
PARTITIONS=1);
Go ahead and create the stream now by pasting this statement into the ksqlDB window you opened at the beginning of this step. After you’ve created the stream, quit the ksqlDB CLI for now by typing exit
.
Now let’s produce some records for the PURCHASE_STREAM
stream
docker exec -i broker /usr/bin/kafka-console-producer --bootstrap-server broker:9092 --topic customer_purchases
After starting the console producer it will wait for your input. To send all send all the stock transactions click on the clipboard icon on the right, then paste the following into the terminal and press enter:
{"id": "1", "previous_purchase": 8000.54, "current_purchase": 5004.89,"txn_ts": "2020-12-04 02:35:43", "first_name": "Art","last_name": "Vandeley"}
{"id": "2", "previous_purchase": 500.33, "current_purchase": 1000.89,"txn_ts": "2020-12-04 02:35:44", "first_name": "Nick","last_name": "Fury"}
{"id": "3", "previous_purchase": 333.18, "current_purchase": 804.89,"txn_ts": "2020-12-04 02:35:45", "first_name": "Natasha","last_name": "Romanov"}
{"id": "4", "previous_purchase": 72848.11, "current_purchase": 60040.89,"txn_ts": "2020-12-04 02:35:46", "first_name": "Wanda","last_name": "Maximoff"}
After you’ve sent the records above, you can close the console producer by entering CTRL+C
.
To begin developing interactively, open up the ksqlDB CLI:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Set ksqlDB to process data from the beginning of each Kafka topic.
SET 'auto.offset.reset' = 'earliest';
Then let’s adjust the column width so we can easily see the results of the query
SET CLI COLUMN-WIDTH 20
Now we write a query to concatenate multiple columns. To achieve this, we will use the -
operator to calculate the difference between two columns.
SELECT FIRST_NAME,
LAST_NAME,
CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF (1)
FROM PURCHASE_STREAM
EMIT CHANGES
LIMIT 4;
1 | Using the - operator to calculate the difference between two columns. |
The - operator expects numerical values. So if have columns where the numbers are stored as VARCHAR you’ll have to use a CAST operation to convert them to a numerical type, otherwise you’ll get an error in your query.
|
This query should produce the following output:
+--------------------+--------------------+--------------------+
|FIRST_NAME |LAST_NAME |PURCHASE_DIFF |
+--------------------+--------------------+--------------------+
|Art |Vandeley |-2995.6499999999996 |
|Nick |Fury |500.56 |
|Natasha |Romanov |471.71 |
|Wanda |Maximoff |-12807.220000000001 |
Limit Reached
Query terminated
Now that the reporting query works, let’s update it to create a continuous query for your reporting scenario:
CREATE STREAM PURCHASE_HISTORY_STREAM AS
SELECT FIRST_NAME,
LAST_NAME,
CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF
FROM PURCHASE_STREAM;
We’re done with the ksqlDB CLI for now so go ahead and type exit
to quit.
Now that you have a series of statements that’s doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at src/statements.sql
with the following content:
CREATE STREAM PURCHASE_STREAM (
ID VARCHAR,
PREVIOUS_PURCHASE DOUBLE,
CURRENT_PURCHASE DOUBLE,
TXN_TS VARCHAR,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR)
WITH (KAFKA_TOPIC='customer_purchases',
VALUE_FORMAT='JSON',
PARTITIONS=1);
CREATE STREAM PURCHASE_HISTORY_STREAM AS
SELECT FIRST_NAME,
LAST_NAME,
CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF
FROM PURCHASE_STREAM;
Create a file at test/input.json
with the inputs for testing:
{
"inputs": [
{
"topic": "customer_purchases",
"value": {
"id": "1",
"previous_purchase": 8000.54,
"current_purchase": 5004.89,
"txn_ts": "2020-12-04 02:35:43",
"first_name": "Art",
"last_name": "Vandeley"
}
},
{
"topic": "customer_purchases",
"value": {
"id": "2",
"previous_purchase": 500.33,
"current_purchase": 1000.89,
"txn_ts": "2020-12-04 02:35:44",
"first_name": "Nick",
"last_name": "Fury"
}
},
{
"topic": "customer_purchases",
"value": {
"id": "3",
"previous_purchase": 333.18,
"current_purchase": 804.89,
"txn_ts": "2020-12-04 02:35:45",
"first_name": "Natasha",
"last_name": "Romanov"
}
},
{
"topic": "customer_purchases",
"value": {
"id": "4",
"previous_purchase": 72848.11,
"current_purchase": 60040.89,
"txn_ts": "2020-12-04 02:35:46",
"first_name": "Wanda",
"last_name": "Maximoff"
}
}
]
}
Create a file at test/output.json
with the expected outputs:
{
"outputs": [
{
"topic": "PURCHASE_HISTORY_STREAM",
"value": {
"FIRST_NAME" : "Art",
"LAST_NAME" : "Vandeley",
"PURCHASE_DIFF" : -2995.6499999999996
}
},
{
"topic": "PURCHASE_HISTORY_STREAM",
"value": {
"FIRST_NAME" : "Nick",
"LAST_NAME" : "Fury",
"PURCHASE_DIFF" : 500.56
}
},
{
"topic": "PURCHASE_HISTORY_STREAM",
"value": {
"FIRST_NAME" : "Natasha",
"LAST_NAME" : "Romanov",
"PURCHASE_DIFF" : 471.71
}
},
{
"topic": "PURCHASE_HISTORY_STREAM",
"value": {
"FIRST_NAME" : "Wanda",
"LAST_NAME" : "Maximoff",
"PURCHASE_DIFF" : -12807.220000000001
}
}
]
}
Invoke the tests using the ksqlDB test runner and the statements file that you created earlier:
docker exec ksqldb-cli ksql-test-runner -i /opt/app/test/input.json -s /opt/app/src/statements.sql -o /opt/app/test/output.json
Which should pass:
>>> Test passed!
Create a file at src/statements.sql
with the following content that represents the statements we tested above without the test data.
CREATE STREAM PURCHASE_STREAM (
ID VARCHAR,
PREVIOUS_PURCHASE DOUBLE,
CURRENT_PURCHASE DOUBLE,
TXN_TS VARCHAR,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR)
WITH (KAFKA_TOPIC='customer_purchases',
VALUE_FORMAT='JSON',
PARTITIONS=1);
CREATE STREAM PURCHASE_HISTORY_STREAM AS
SELECT FIRST_NAME,
LAST_NAME,
CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF
FROM PURCHASE_STREAM;
Launch your statements into production by sending them to the REST API with the following command:
tr '\n' ' ' < src/statements.sql | \
sed 's/;/;\'$'\n''/g' | \
while read stmt; do
echo '{"ksql":"'$stmt'", "streamsProperties": {}}' | \
curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d @- | \
jq
done
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.
Sign up for Confluent Cloud, a fully-managed Apache Kafka service.
After you log in to Confluent Cloud Console, click on Add cloud environment
and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the Menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details).
Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.
Next, from the Confluent Cloud Console, click on Clients
to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.