How can you count the number of messages in a Kafka topic?
Create a ksqlDB stream over the Kafka topic, and then use the COUNT
function to count the number of messages.
SELECT 'X' AS X,
COUNT(*) AS MSG_CT
FROM PAGEVIEWS
GROUP BY 'X'
EMIT CHANGES LIMIT 1;
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir count-messages && cd count-messages
Then make the following directories to set up its structure:
mkdir src test
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
container_name: broker
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
ksqldb:
image: confluentinc/ksqldb-server:0.28.2
container_name: ksqldb
depends_on:
- broker
ports:
- 8088:8088
user: root
environment:
KSQL_CONFIG_DIR: /etc/ksqldb
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_HIDDEN_TOPICS: ^_.*
KSQL_KSQL_CONNECT_WORKER_CONFIG: /etc/ksqldb/connect.properties
KSQL_CONNECT_BOOTSTRAP_SERVERS: broker:29092
KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: ksqldb
KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
KSQL_CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m
(%c:%L)%n'
KSQL_CONNECT_PLUGIN_PATH: /usr/share/java
command:
- bash
- -c
- |
echo "Installing connector plugins"
# I miss the confluent-hub client :-/
# mkdir -p /usr/share/confluent-hub-components/
# confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.3.3
# ------ hack to workaround absence of confluent-hub client
curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-datagen/versions/0.4.0/confluentinc-kafka-connect-datagen-0.4.0.zip -o /tmp/kafka-connect-datagen.zip
yum install -y unzip
unzip /tmp/kafka-connect-datagen.zip -d /usr/share/java/kafka-connect-datagen
# ----------------------------------------------------------
#
echo "Launching ksqlDB"
/usr/bin/docker/run &
echo "Waiting for Kafka Connect to start listening on localhost ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Creating Data Generators"
curl -i -X PUT http://localhost:8083/connectors/datagen_01/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"value.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"iterations": 42,
"tasks.max": "1"
}'
curl -i -X PUT http://localhost:8083/connectors/datagen_02/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"value.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"kafka.topic": "trades",
"quickstart": "Stock_Trades",
"max.interval": 1000,
"iterations": 4242424242,
"tasks.max": "1"
}'
sleep infinity
volumes:
- ./src:/opt/app/src
- ./test:/opt/app/test
kcat:
image: edenhill/kcat:1.7.1
container_name: kcat
links:
- broker
entrypoint:
- /bin/sh
- -c
- "apk add jq; \nwhile [ 1 -eq 1 ];do sleep 60;done\n"
And bring up the stack of components by running:
docker compose up -d
Run this snippet of code which will block until ksqlDB is available
docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done'
To begin developing interactively, open up the ksqlDB CLI:
docker exec -it ksqldb ksql http://ksqldb:8088
First, we’ll declare a ksqlDB stream on the Kafka topic with which we want to work. A ksqlDB stream is a Kafka topic (which here already exists), with a schema.
CREATE STREAM pageviews (msg VARCHAR)
WITH (KAFKA_TOPIC ='pageviews',
VALUE_FORMAT='JSON');
Note that at this stage we’re just interested in counting the messages in their entirety, so we define the loosest schema possible (msg VARCHAR
) for speed.
Since we want to count all of the messages in the topic (and not just those that arrive after the query has started) we need to tell ksqlDB to query from the beginning of the topic:
SET 'auto.offset.reset' = 'earliest';
You should get message confirming the change:
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
Now let’s count all of the messages in the topic:
SELECT 'X' AS X,
COUNT(*) AS MSG_CT
FROM PAGEVIEWS
GROUP BY 'X'
EMIT CHANGES LIMIT 1;
We’re specifying a LIMIT 1
clause here so that the query exits after the first row has returned. Without this, ksqlDB will continue to write any changes in the count to the screen as new messages arrive.
You should now see a count of all the messages in the topic:
+------------------------+------------------------+
|X |MSG_CT |
+------------------------+------------------------+
|X |42 |
Limit Reached
Query terminated
You may be wondering about the purpose of X in the query. It’s a dummy field to persuade ksqlDB to do an aggregation across all messages. For more information see ksqlDB GitHub issue #430.
|
As we saw from the query above, it will scan through all of the messages in the topic and then output the total, including changes as new messages arrive. What if we would like to maintain a count of messages that we can query with low-latency whenever we want?
This is where ksqlDB tables come in. They are stateful aggregations held by ksqlDB, backed both by Kafka topics, and an internal state store.
Run this SQL to create a table holding the results of the same COUNT(*)
that we ran above.
CREATE TABLE MSG_COUNT AS
SELECT 'X' AS X,
COUNT(*) AS MSG_CT
FROM PAGEVIEWS
GROUP BY 'X'
EMIT CHANGES;
The output from this is a confirmation that the table has been created.
Created query with ID CTAS_MSG_COUNT_0
Often we will want to just query the current number of messages in a topic from the materialised view that we built in the ksqlDB table and exit. Compare this to the query above with EMIT CHANGES
in which the query continues to run until we cancel it (or add a LIMIT
clause).
SELECT * FROM MSG_COUNT WHERE X='X';
As before we get the total number of messages, but instead of querying the stream directly—and performing the aggregation whilst we wait—we are querying the stateful aggregation that ksqlDB has built.
+------------------------+------------------------+
|X |MSG_CT |
+------------------------+------------------------+
|X |42 |
Query terminated
This type of query is known as a pull query and can be used against ksqlDB tables with materialised state with certain conditions around the predicate used.
The neat thing about it is that you can use ksqlDB’s REST API (or Java client) to run pull queries from your own application to directly look up a value. Consider, instead of needing an external datastore into which to hold state for your application to query, you can actually do this directly from Kafka and ksqlDB itself.
Exit the ksqlDB command prompt and run this from your shell directly:
docker exec ksqldb \
curl --silent --show-error \
--http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT MSG_CT FROM MSG_COUNT WHERE X='\''X'\'';"}'
Now you get the count of messages in the Kafka topic, queried directly from the materialized view that’s built and populated in ksqlDB.
{"queryId":null,"columnNames":["MSG_CT"],"columnTypes":["BIGINT"]}
[42]
Now that you have a series of statements that’s doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at src/statements.sql
with the following content:
CREATE STREAM pageviews (msg VARCHAR)
WITH (KAFKA_TOPIC ='pageviews',
VALUE_FORMAT='JSON');
CREATE TABLE MSG_COUNT AS
SELECT 'X' AS X,
COUNT(*) AS MSG_CT
FROM PAGEVIEWS
GROUP BY 'X'
EMIT CHANGES;
Create a file at test/input.json
with the inputs for testing:
{
"inputs": [
{
"topic": "pageviews",
"timestamp": 1600341475094,
"key": "1",
"value": {
"viewtime": 1,
"userid": "User_7",
"pageid": "Page_47"
}
},
{
"topic": "pageviews",
"timestamp": 1600341475206,
"key": "11",
"value": {
"viewtime": 11,
"userid": "User_8",
"pageid": "Page_47"
}
},
{
"topic": "pageviews",
"timestamp": 1600341475478,
"key": "21",
"value": {
"viewtime": 21,
"userid": "User_7",
"pageid": "Page_64"
}
},
{
"topic": "pageviews",
"timestamp": 1600341475955,
"key": "31",
"value": {
"viewtime": 31,
"userid": "User_7",
"pageid": "Page_29"
}
},
{
"topic": "pageviews",
"timestamp": 1600341476073,
"key": "41",
"value": {
"viewtime": 41,
"userid": "User_4",
"pageid": "Page_41"
}
},
{
"topic": "pageviews",
"timestamp": 1600341476547,
"key": "51",
"value": {
"viewtime": 51,
"userid": "User_6",
"pageid": "Page_68"
}
},
{
"topic": "pageviews",
"timestamp": 1600341476659,
"key": "61",
"value": {
"viewtime": 61,
"userid": "User_5",
"pageid": "Page_17"
}
},
{
"topic": "pageviews",
"timestamp": 1600341476677,
"key": "71",
"value": {
"viewtime": 71,
"userid": "User_1",
"pageid": "Page_60"
}
},
{
"topic": "pageviews",
"timestamp": 1600341476727,
"key": "81",
"value": {
"viewtime": 81,
"userid": "User_4",
"pageid": "Page_80"
}
},
{
"topic": "pageviews",
"timestamp": 1600341476886,
"key": "91",
"value": {
"viewtime": 91,
"userid": "User_8",
"pageid": "Page_90"
}
},
{
"topic": "pageviews",
"timestamp": 1600341477316,
"key": "101",
"value": {
"viewtime": 101,
"userid": "User_9",
"pageid": "Page_33"
}
},
{
"topic": "pageviews",
"timestamp": 1600341477503,
"key": "111",
"value": {
"viewtime": 111,
"userid": "User_7",
"pageid": "Page_50"
}
},
{
"topic": "pageviews",
"timestamp": 1600341477758,
"key": "121",
"value": {
"viewtime": 121,
"userid": "User_6",
"pageid": "Page_10"
}
},
{
"topic": "pageviews",
"timestamp": 1600341478251,
"key": "131",
"value": {
"viewtime": 131,
"userid": "User_4",
"pageid": "Page_75"
}
},
{
"topic": "pageviews",
"timestamp": 1600341478450,
"key": "141",
"value": {
"viewtime": 141,
"userid": "User_3",
"pageid": "Page_13"
}
},
{
"topic": "pageviews",
"timestamp": 1600341478512,
"key": "151",
"value": {
"viewtime": 151,
"userid": "User_6",
"pageid": "Page_28"
}
},
{
"topic": "pageviews",
"timestamp": 1600341478868,
"key": "161",
"value": {
"viewtime": 161,
"userid": "User_2",
"pageid": "Page_47"
}
},
{
"topic": "pageviews",
"timestamp": 1600341479027,
"key": "171",
"value": {
"viewtime": 171,
"userid": "User_3",
"pageid": "Page_44"
}
},
{
"topic": "pageviews",
"timestamp": 1600341479064,
"key": "181",
"value": {
"viewtime": 181,
"userid": "User_1",
"pageid": "Page_42"
}
},
{
"topic": "pageviews",
"timestamp": 1600341479462,
"key": "191",
"value": {
"viewtime": 191,
"userid": "User_1",
"pageid": "Page_59"
}
},
{
"topic": "pageviews",
"timestamp": 1600341479532,
"key": "201",
"value": {
"viewtime": 201,
"userid": "User_1",
"pageid": "Page_30"
}
},
{
"topic": "pageviews",
"timestamp": 1600341480014,
"key": "211",
"value": {
"viewtime": 211,
"userid": "User_1",
"pageid": "Page_47"
}
},
{
"topic": "pageviews",
"timestamp": 1600341480252,
"key": "221",
"value": {
"viewtime": 221,
"userid": "User_2",
"pageid": "Page_23"
}
},
{
"topic": "pageviews",
"timestamp": 1600341480652,
"key": "231",
"value": {
"viewtime": 231,
"userid": "User_9",
"pageid": "Page_97"
}
},
{
"topic": "pageviews",
"timestamp": 1600341481044,
"key": "241",
"value": {
"viewtime": 241,
"userid": "User_7",
"pageid": "Page_18"
}
},
{
"topic": "pageviews",
"timestamp": 1600341481304,
"key": "251",
"value": {
"viewtime": 251,
"userid": "User_1",
"pageid": "Page_22"
}
},
{
"topic": "pageviews",
"timestamp": 1600341481749,
"key": "261",
"value": {
"viewtime": 261,
"userid": "User_5",
"pageid": "Page_35"
}
},
{
"topic": "pageviews",
"timestamp": 1600341482080,
"key": "271",
"value": {
"viewtime": 271,
"userid": "User_5",
"pageid": "Page_13"
}
},
{
"topic": "pageviews",
"timestamp": 1600341482470,
"key": "281",
"value": {
"viewtime": 281,
"userid": "User_3",
"pageid": "Page_33"
}
},
{
"topic": "pageviews",
"timestamp": 1600341482957,
"key": "291",
"value": {
"viewtime": 291,
"userid": "User_2",
"pageid": "Page_41"
}
},
{
"topic": "pageviews",
"timestamp": 1600341483102,
"key": "301",
"value": {
"viewtime": 301,
"userid": "User_3",
"pageid": "Page_40"
}
},
{
"topic": "pageviews",
"timestamp": 1600341483115,
"key": "311",
"value": {
"viewtime": 311,
"userid": "User_3",
"pageid": "Page_24"
}
},
{
"topic": "pageviews",
"timestamp": 1600341483594,
"key": "321",
"value": {
"viewtime": 321,
"userid": "User_5",
"pageid": "Page_88"
}
},
{
"topic": "pageviews",
"timestamp": 1600341483978,
"key": "331",
"value": {
"viewtime": 331,
"userid": "User_1",
"pageid": "Page_18"
}
},
{
"topic": "pageviews",
"timestamp": 1600341484149,
"key": "341",
"value": {
"viewtime": 341,
"userid": "User_5",
"pageid": "Page_35"
}
},
{
"topic": "pageviews",
"timestamp": 1600341484582,
"key": "351",
"value": {
"viewtime": 351,
"userid": "User_1",
"pageid": "Page_85"
}
},
{
"topic": "pageviews",
"timestamp": 1600341484880,
"key": "361",
"value": {
"viewtime": 361,
"userid": "User_1",
"pageid": "Page_29"
}
},
{
"topic": "pageviews",
"timestamp": 1600341485041,
"key": "371",
"value": {
"viewtime": 371,
"userid": "User_1",
"pageid": "Page_40"
}
},
{
"topic": "pageviews",
"timestamp": 1600341485271,
"key": "381",
"value": {
"viewtime": 381,
"userid": "User_3",
"pageid": "Page_29"
}
},
{
"topic": "pageviews",
"timestamp": 1600341485570,
"key": "391",
"value": {
"viewtime": 391,
"userid": "User_4",
"pageid": "Page_15"
}
},
{
"topic": "pageviews",
"timestamp": 1600341485759,
"key": "401",
"value": {
"viewtime": 401,
"userid": "User_2",
"pageid": "Page_27"
}
},
{
"topic": "pageviews",
"timestamp": 1600341485951,
"key": "411",
"value": {
"viewtime": 411,
"userid": "User_9",
"pageid": "Page_22"
}
}
]
}
Similarly, create a file at test/output.json
with the expected outputs:
{ "outputs": [
{ "topic": "MSG_COUNT", "timestamp":1600341475094, "key": "X", "value": {"MSG_CT": 1}},
{ "topic": "MSG_COUNT", "timestamp":1600341475206, "key": "X", "value": {"MSG_CT": 2}},
{ "topic": "MSG_COUNT", "timestamp":1600341475478, "key": "X", "value": {"MSG_CT": 3}},
{ "topic": "MSG_COUNT", "timestamp":1600341475955, "key": "X", "value": {"MSG_CT": 4}},
{ "topic": "MSG_COUNT", "timestamp":1600341476073, "key": "X", "value": {"MSG_CT": 5}},
{ "topic": "MSG_COUNT", "timestamp":1600341476547, "key": "X", "value": {"MSG_CT": 6}},
{ "topic": "MSG_COUNT", "timestamp":1600341476659, "key": "X", "value": {"MSG_CT": 7}},
{ "topic": "MSG_COUNT", "timestamp":1600341476677, "key": "X", "value": {"MSG_CT": 8}},
{ "topic": "MSG_COUNT", "timestamp":1600341476727, "key": "X", "value": {"MSG_CT": 9}},
{ "topic": "MSG_COUNT", "timestamp":1600341476886, "key": "X", "value": {"MSG_CT": 10}},
{ "topic": "MSG_COUNT", "timestamp":1600341477316, "key": "X", "value": {"MSG_CT": 11}},
{ "topic": "MSG_COUNT", "timestamp":1600341477503, "key": "X", "value": {"MSG_CT": 12}},
{ "topic": "MSG_COUNT", "timestamp":1600341477758, "key": "X", "value": {"MSG_CT": 13}},
{ "topic": "MSG_COUNT", "timestamp":1600341478251, "key": "X", "value": {"MSG_CT": 14}},
{ "topic": "MSG_COUNT", "timestamp":1600341478450, "key": "X", "value": {"MSG_CT": 15}},
{ "topic": "MSG_COUNT", "timestamp":1600341478512, "key": "X", "value": {"MSG_CT": 16}},
{ "topic": "MSG_COUNT", "timestamp":1600341478868, "key": "X", "value": {"MSG_CT": 17}},
{ "topic": "MSG_COUNT", "timestamp":1600341479027, "key": "X", "value": {"MSG_CT": 18}},
{ "topic": "MSG_COUNT", "timestamp":1600341479064, "key": "X", "value": {"MSG_CT": 19}},
{ "topic": "MSG_COUNT", "timestamp":1600341479462, "key": "X", "value": {"MSG_CT": 20}},
{ "topic": "MSG_COUNT", "timestamp":1600341479532, "key": "X", "value": {"MSG_CT": 21}},
{ "topic": "MSG_COUNT", "timestamp":1600341480014, "key": "X", "value": {"MSG_CT": 22}},
{ "topic": "MSG_COUNT", "timestamp":1600341480252, "key": "X", "value": {"MSG_CT": 23}},
{ "topic": "MSG_COUNT", "timestamp":1600341480652, "key": "X", "value": {"MSG_CT": 24}},
{ "topic": "MSG_COUNT", "timestamp":1600341481044, "key": "X", "value": {"MSG_CT": 25}},
{ "topic": "MSG_COUNT", "timestamp":1600341481304, "key": "X", "value": {"MSG_CT": 26}},
{ "topic": "MSG_COUNT", "timestamp":1600341481749, "key": "X", "value": {"MSG_CT": 27}},
{ "topic": "MSG_COUNT", "timestamp":1600341482080, "key": "X", "value": {"MSG_CT": 28}},
{ "topic": "MSG_COUNT", "timestamp":1600341482470, "key": "X", "value": {"MSG_CT": 29}},
{ "topic": "MSG_COUNT", "timestamp":1600341482957, "key": "X", "value": {"MSG_CT": 30}},
{ "topic": "MSG_COUNT", "timestamp":1600341483102, "key": "X", "value": {"MSG_CT": 31}},
{ "topic": "MSG_COUNT", "timestamp":1600341483115, "key": "X", "value": {"MSG_CT": 32}},
{ "topic": "MSG_COUNT", "timestamp":1600341483594, "key": "X", "value": {"MSG_CT": 33}},
{ "topic": "MSG_COUNT", "timestamp":1600341483978, "key": "X", "value": {"MSG_CT": 34}},
{ "topic": "MSG_COUNT", "timestamp":1600341484149, "key": "X", "value": {"MSG_CT": 35}},
{ "topic": "MSG_COUNT", "timestamp":1600341484582, "key": "X", "value": {"MSG_CT": 36}},
{ "topic": "MSG_COUNT", "timestamp":1600341484880, "key": "X", "value": {"MSG_CT": 37}},
{ "topic": "MSG_COUNT", "timestamp":1600341485041, "key": "X", "value": {"MSG_CT": 38}},
{ "topic": "MSG_COUNT", "timestamp":1600341485271, "key": "X", "value": {"MSG_CT": 39}},
{ "topic": "MSG_COUNT", "timestamp":1600341485570, "key": "X", "value": {"MSG_CT": 40}},
{ "topic": "MSG_COUNT", "timestamp":1600341485759, "key": "X", "value": {"MSG_CT": 41}},
{ "topic": "MSG_COUNT", "timestamp":1600341485951, "key": "X", "value": {"MSG_CT": 42}}
] }
Note that contrary to the output we saw from the CLI above, in the test execution there is no buffering of the input records and so an aggregate value is emitted for every input record processed (ref).
Lastly, invoke the tests using the test runner and the statements file that you created earlier:
docker exec ksqldb ksql-test-runner -i /opt/app/test/input.json -s /opt/app/src/statements.sql -o /opt/app/test/output.json 2>&1
Which should pass:
>>> Test passed!
Once the test has completed successfully you can tear down the Docker Compose stack. This will delete all data that you’ve stored in Kafka and ksqlDB.
docker compose down