SELECT FIRST_NAME + ' ' + LAST_NAME +
' purchased ' +
CAST(NUM_SHARES AS VARCHAR) + (1)
' shares of ' +
SYMBOL AS SUMMARY
FROM ACTIVITY_STREAM
EMIT CHANGES
LIMIT 4;
How can you concatenate values from multiple columns into a single column?
Select the fields you want to combine and use the +
operator to concatenate them into one field:
SELECT FIRST_NAME + ' ' + LAST_NAME +
' purchased ' +
CAST(NUM_SHARES AS VARCHAR) +
' shares of ' +
SYMBOL AS SUMMARY
Note that concatenation only works with STRING
values, so you’ll have to use a CAST
operation on non-string fields as demonstrated above, otherwise your query will result in an error.
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir concatenation && cd concatenation
Then make the following directories to set up its structure:
mkdir src test
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
ksqldb-server:
image: confluentinc/ksqldb-server:0.28.2
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksqldb/log4j.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:9092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: "earliest"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.28.2
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
volumes:
- ./src:/opt/app/src
- ./test:/opt/app/test
And launch it by running:
docker compose up -d
To begin developing interactively, open up the ksqlDB CLI:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
The first thing we do is to create a stream named ACTIVITY_STREAM
. This statement creates the stock_purchases
topic, since it doesn’t already exist. For more details check out the ksqlDB documentation on the CREATE STREAM statement. The data contained in the topic is just plain, schemaless JSON.
CREATE STREAM ACTIVITY_STREAM (
ID VARCHAR,
NUM_SHARES INT,
AMOUNT DOUBLE,
TXN_TS VARCHAR,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR,
SYMBOL VARCHAR )
WITH (KAFKA_TOPIC='stock_purchases',
VALUE_FORMAT='JSON',
PARTITIONS=1);
Go ahead and create the stream now by pasting this statement into the ksqlDB window you opened at the beginning of this step. After you’ve created the stream, quit the ksqlDB CLI for now by typing exit
.
Now let’s produce some records for the ACTIVITY_STREAM
stream
docker exec -i broker /usr/bin/kafka-console-producer --bootstrap-server broker:9092 --topic stock_purchases
After starting the console producer it will wait for your input. To send all send all the stock transactions click on the clipboard icon on the right, then paste the following into the terminal and press enter:
{"id": "1", "num_shares": 30000, "amount": 5004.89,"txn_ts": "2020-12-04 02:35:43", "first_name": "Art","last_name": "Vandeley", "symbol": "IMEP"}
{"id": "2", "num_shares": 500, "amount": 1000.89,"txn_ts": "2020-12-04 02:35:44", "first_name": "Nick","last_name": "Fury", "symbol": "IMEP"}
{"id": "3", "num_shares": 45729, "amount": 804.89,"txn_ts": "2020-12-04 02:35:45", "first_name": "Natasha","last_name": "Romanov", "symbol": "STRK"}
{"id": "4", "num_shares": 72848, "amount": 60040.89,"txn_ts": "2020-12-04 02:35:46", "first_name": "Wanda","last_name": "Maximoff", "symbol": "STRK"}
After you’ve sent the records above, you can close the console producer by entering CTRL+C
.
To begin developing interactively, open up the ksqlDB CLI:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Set ksqlDB to process data from the beginning of each Kafka topic.
SET 'auto.offset.reset' = 'earliest';
Then let’s adjust the column width so we can easily see the results of the query
SET CLI COLUMN-WIDTH 50
Now we write a query to concatenate multiple columns. To achieve this, we will use the +
operator between the fields in our SELECT
statement rather than a comma.
SELECT FIRST_NAME + ' ' + LAST_NAME +
' purchased ' +
CAST(NUM_SHARES AS VARCHAR) + (1)
' shares of ' +
SYMBOL AS SUMMARY
FROM ACTIVITY_STREAM
EMIT CHANGES
LIMIT 4;
1 | The NUM_SHARES field is an INT so we need to cast it to a VARCHAR as concatenate only works with STRING types |
You can also SELECT fields you don’t want to concatenate. In that case you use a comma to separate the field from those you concatenate. For example, you can SELECT individual fields field_1 and field_2 at the same time that you concatenate field_3 with field_4 . For example
SELECT field_1, field_2, field_3 + field_4
|
This query should produce the following output:
+--------------------------------------------------+
|SUMMARY |
+--------------------------------------------------+
|Art Vandeley purchased 30000 shares of IMEP |
|Nick Fury purchased 500 shares of IMEP |
|Natasha Romanov purchased 45729 shares of STRK |
|Wanda Maximoff purchased 72848 shares of STRK |
Limit Reached
Query terminated
Now that the reporting query works, let’s update it to create a continuous query for your reporting scenario
CREATE STREAM SUMMARY_RESULTS AS
SELECT FIRST_NAME + ' ' + LAST_NAME +
' purchased ' +
CAST(NUM_SHARES AS VARCHAR) +
' shares of ' +
SYMBOL AS SUMMARY
FROM ACTIVITY_STREAM;
We’re done with the ksqlDB CLI for now so go ahead and type exit
to quit.
Now that you have a series of statements that’s doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at src/statements.sql
with the following content:
CREATE STREAM ACTIVITY_STREAM (
ID VARCHAR,
NUM_SHARES INT,
AMOUNT DOUBLE,
TXN_TS VARCHAR,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR,
SYMBOL VARCHAR )
WITH (KAFKA_TOPIC='stock_purchases',
VALUE_FORMAT='JSON',
PARTITIONS=1);
CREATE STREAM SUMMARY_RESULTS AS
SELECT FIRST_NAME + ' ' + LAST_NAME +
' purchased ' +
CAST(NUM_SHARES AS VARCHAR) +
' shares of ' +
SYMBOL AS SUMMARY
FROM ACTIVITY_STREAM;
Create a file at test/input.json
with the inputs for testing:
{
"inputs": [
{
"topic": "stock_purchases",
"value": {
"id": "1",
"num_shares": 30000,
"amount": 5004.89,
"txn_ts": "2020-12-04 02:35:43",
"first_name": "Art",
"last_name": "Vandeley",
"symbol": "IMEP"
}
},
{
"topic": "stock_purchases",
"value": {
"id": "2",
"num_shares": 500,
"amount": 1000.89,
"txn_ts": "2020-12-04 02:35:44",
"first_name": "Nick",
"last_name": "Fury",
"symbol": "IMEP"
}
},
{
"topic": "stock_purchases",
"value": {
"id": "3",
"num_shares": 45729,
"amount": 804.89,
"txn_ts": "2020-12-04 02:35:45",
"first_name": "Natasha",
"last_name": "Romanov",
"symbol": "STRK"
}
},
{
"topic": "stock_purchases",
"value": {
"id": "4",
"num_shares": 72848,
"amount": 60040.89,
"txn_ts": "2020-12-04 02:35:46",
"first_name": "Wanda",
"last_name": "Maximoff",
"symbol": "STRK"
}
}
]
}
Create a file at test/output.json
with the expected outputs:
{
"outputs": [
{
"topic": "SUMMARY_RESULTS",
"value": {
"SUMMARY" : "Art Vandeley purchased 30000 shares of IMEP"
}
},
{
"topic": "SUMMARY_RESULTS",
"value": {
"SUMMARY" : "Nick Fury purchased 500 shares of IMEP"
}
},
{
"topic": "SUMMARY_RESULTS",
"value": {
"SUMMARY" : "Natasha Romanov purchased 45729 shares of STRK"
}
},
{
"topic": "SUMMARY_RESULTS",
"value": {
"SUMMARY" : "Wanda Maximoff purchased 72848 shares of STRK"
}
}
]
}
Invoke the tests using the ksqlDB test runner and the statements file that you created earlier:
docker exec ksqldb-cli ksql-test-runner -i /opt/app/test/input.json -s /opt/app/src/statements.sql -o /opt/app/test/output.json
Which should pass:
>>> Test passed!
Create a file at src/statements.sql
with the following content that represents the statements we tested above without the test data.
CREATE STREAM ACTIVITY_STREAM (
ID VARCHAR,
NUM_SHARES INT,
AMOUNT DOUBLE,
TXN_TS VARCHAR,
FIRST_NAME VARCHAR,
LAST_NAME VARCHAR,
SYMBOL VARCHAR )
WITH (KAFKA_TOPIC='stock_purchases',
VALUE_FORMAT='JSON',
PARTITIONS=1);
CREATE STREAM SUMMARY_RESULTS AS
SELECT FIRST_NAME + ' ' + LAST_NAME +
' purchased ' +
CAST(NUM_SHARES AS VARCHAR) +
' shares of ' +
SYMBOL AS SUMMARY
FROM ACTIVITY_STREAM;
Launch your statements into production by sending them to the REST API with the following command:
tr '\n' ' ' < src/statements.sql | \
sed 's/;/;\'$'\n''/g' | \
while read stmt; do
echo '{"ksql":"'$stmt'", "streamsProperties": {}}' | \
curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d @- | \
jq
done
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.
Sign up for Confluent Cloud, a fully-managed Apache Kafka service.
After you log in to Confluent Cloud Console, click on Add cloud environment
and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the Menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details).
Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.
Next, from the Confluent Cloud Console, click on Clients
to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.