Get Started Free
Tutorial

How to convert a KStream to a KTable in Kafka Streams

How to convert a KStream to a KTable in Kafka Streams

If you have a KStream and you need to convert it to a KTable, KStream.toTable does the trick. Prior to the introduction of this method in Apache Kafka 2.5, a dummy aggregation operation was required.

As a concrete example, consider a topic with string keys and values. To convert the stream to a KTable:

  KTable<String, String> convertedTable = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde))
    .toTable(Materialized.as("stream-converted-to-table"));

The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.

Prerequisites

  • A Confluent Cloud account
  • The Confluent CLI installed on your machine
  • Apache Kafka or Confluent Platform (both include the Kafka Streams application reset tool)
  • Clone the confluentinc/tutorials repository and navigate into its top-level directory:
    git clone git@github.com:confluentinc/tutorials.git
    cd tutorials

Create Confluent Cloud resources

Login to your Confluent Cloud account:

confluent login --prompt --save

Install a CLI plugin that will streamline the creation of resources in Confluent Cloud:

confluent plugin install confluent-quickstart

Run the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.

confluent quickstart \
  --environment-name kafka-streams-stream-to-table-env \
  --kafka-cluster-name kafka-streams-stream-to-table-cluster \
  --create-kafka-key \
  --kafka-java-properties-file ./streams-to-table/kstreams/src/main/resources/cloud.properties

The plugin should complete in under a minute.

Create topics

Create the input and output topics for the application:

confluent kafka topic create input-topic
confluent kafka topic create streams-output-topic
confluent kafka topic create table-output-topic

Start a console producer:

confluent kafka topic produce input-topic --parse-key --delimiter :

Enter a few key/value pairs:

1:one
2:two
3:three

Enter Ctrl+C to exit the console producer.

Compile and run the application

Compile the application from the top-level tutorials repository directory:

./gradlew streams-to-table:kstreams:shadowJar

Navigate into the application's home directory:

cd streams-to-table/kstreams

Run the application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:

java -cp ./build/libs/streams-to-table-standalone.jar \
    io.confluent.developer.StreamsToTable \
    ./src/main/resources/cloud.properties

Validate that you see the same messages in the streams-output-topic and table-output-topic topics. This is because converting to a KTable is a logical operation and only changes the interpretation of the stream.

confluent kafka topic consume streams-output-topic -b --print-key
confluent kafka topic consume table-output-topic -b --print-key

You should see:

1	one
2	two
3	three

Clean up

When you are finished, delete the kafka-streams-stream-to-table-env environment by first getting the environment ID of the form env-123456 corresponding to it:

confluent environment list

Delete the environment, including all resources created for this tutorial:

confluent environment delete <ENVIRONMENT ID>
Docker instructions

Prerequisites

  • Docker running via Docker Desktop or Docker Engine
  • Docker Compose. Ensure that the command docker compose version succeeds.
  • Clone the confluentinc/tutorials repository and navigate into its top-level directory:
    git clone git@github.com:confluentinc/tutorials.git
    cd tutorials

Start Kafka in Docker

Start Kafka with the following command run from the top-level tutorials repository directory:

docker compose -f ./docker/docker-compose-kafka.yml up -d

Create topics

Open a shell in the broker container:

docker exec -it broker /bin/bash

Create the input and output topics for the application:

kafka-topics --bootstrap-server localhost:9092 --create --topic input-topic
kafka-topics --bootstrap-server localhost:9092 --create --topic streams-output-topic
kafka-topics --bootstrap-server localhost:9092 --create --topic table-output-topic

Start a console producer:

kafka-console-producer --bootstrap-server localhost:9092 --topic input-topic \
    --property "parse.key=true" --property "key.separator=:"

Enter a few key/value pairs:

1:one
2:two
3:three

Enter Ctrl+C to exit the console producer.

Compile and run the application

On your local machine, compile the app:

./gradlew streams-to-table:kstreams:shadowJar

Navigate into the application's home directory:

cd streams-to-table/kstreams

Run the application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092:

java -cp ./build/libs/streams-to-table-standalone.jar \
    io.confluent.developer.StreamsToTable \
    ./src/main/resources/local.properties

Validate that you see the same messages in the streams-output-topic and table-output-topic topics. This is because converting to a KTable is a logical operation and only changes the interpretation of the stream.

kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-output-topic --from-beginning --property  "print.key=true"
kafka-console-consumer --bootstrap-server localhost:9092 --topic table-output-topic --from-beginning --property  "print.key=true"

You should see:

1	one
2	two
3	three

Clean up

From your local machine, stop the broker container:

docker compose -f ./docker/docker-compose-kafka.yml down
Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.