How to convert a Kafka Streams KStream to a KTable

Question:

How do you convert a KStream to a KTable without having to perform a dummy aggregation operation?

Edit this page

Example use case:

You have a KStream and you need to convert it to a KTable, but you don't need an aggregation operation. With the 2.5 release of Apache Kafka, Kafka Streams introduced a new method, KStream.toTable, which allows users to easily convert a KStream to a KTable without having to perform an aggregation operation.

Hands-on code example:

New to Confluent Cloud? Get started here.

Run it

Initialize the project

1

To get started, make a new directory anywhere you’d like for this project:

mkdir streams-to-table && cd streams-to-table

Next, create a directory for configuration data:

mkdir configuration

Provision your Kafka cluster

2

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.

Take me to Confluent Cloud
  1. After you log in to Confluent Cloud, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Write the cluster information into a local file

3

From the Confluent Cloud Console, navigate to your Kafka cluster and then select Clients in the lefthand navigation. From the Clients view, create a new client and click Java to get the connection information customized to your cluster.

Create new credentials for your Kafka cluster and Schema Registry, writing in appropriate descriptions so that the keys are easy to find and delete later. The Confluent Cloud Console will show a configuration similar to below with your new credentials automatically populated (make sure Show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials.

Download and set up the Confluent CLI

4

This tutorial has some steps for Kafka topic management and producing and consuming events, for which you can use the Confluent Cloud Console or the Confluent CLI. Follow the instructions here to install the Confluent CLI, and then follow these steps connect the CLI to your Confluent Cloud cluster.

Configure the project

5

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation 'org.apache.kafka:kafka-streams:3.4.0'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.4.0'
        }
      }
    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.StreamsToTable"
    )
  }
}

shadowJar {
    archiveBaseName = "streams-to-table-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Then create a development file at configuration/dev.properties:

application.id=streams-to-table
replication.factor=3

input.topic.name=input-topic
input.topic.partitions=6
input.topic.replication.factor=3

streams.output.topic.name=streams-output-topic
streams.output.topic.partitions=6
streams.output.topic.replication.factor=3

table.output.topic.name=table-output-topic
table.output.topic.partitions=6
table.output.topic.replication.factor=3

Update the properties file with Confluent Cloud information

6

Using the command below, append the contents of configuration/ccloud.properties (with your Confluent Cloud configuration) to configuration/dev.properties (with the application properties).

cat configuration/ccloud.properties >> configuration/dev.properties

Create the Kafka Streams topology

7

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

The heart of this tutorial is a simple one liner. You’ll take an existing KStream object and use the toTable() method to covert it into a KTable. This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you’ve materialized the KTable, so it’s available for you to use Interactive Queries.

  
    final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
    // this line takes the previous KStream and converts it to a KTable
    final KTable<String, String> convertedTable = stream.toTable(Materialized.as("stream-converted-to-table"));
  

The rest of this Kafka Streams application simply writes the incoming records back out to a topic. In the subsequent tutorial steps you’ll use a console consumer to observe the differences between a record stream and a changelog stream.

Now go ahead and create the following file at src/main/java/io/confluent/developer/StreamsToTable.java.

package io.confluent.developer;



import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

public class StreamsToTable {


    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = allProps.getProperty("input.topic.name");
        final String streamsOutputTopic = allProps.getProperty("streams.output.topic.name");
        final String tableOutputTopic = allProps.getProperty("table.output.topic.name");

        final Serde<String> stringSerde = Serdes.String();

        final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));

        final KTable<String, String> convertedTable = stream.toTable(Materialized.as("stream-converted-to-table"));

        stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
        convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));


        return builder.build();
    }


    public void createTopics(final Properties allProps) {
        try (final AdminClient client = AdminClient.create(allProps)) {

        final List<NewTopic> topics = new ArrayList<>();

            topics.add(new NewTopic(
                    allProps.getProperty("input.topic.name"),
                    Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                    Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));

            topics.add(new NewTopic(
                    allProps.getProperty("streams.output.topic.name"),
                    Integer.parseInt(allProps.getProperty("streams.output.topic.partitions")),
                    Short.parseShort(allProps.getProperty("streams.output.topic.replication.factor"))));

            topics.add(new NewTopic(
                allProps.getProperty("table.output.topic.name"),
                Integer.parseInt(allProps.getProperty("table.output.topic.partitions")),
                Short.parseShort(allProps.getProperty("table.output.topic.replication.factor"))));

            client.createTopics(topics);
        }
    }

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        final StreamsToTable instance = new StreamsToTable();

        final Properties allProps = new Properties();
        try (InputStream inputStream = new FileInputStream(args[0])) {
            allProps.load(inputStream);
        }
        allProps.put(StreamsConfig.APPLICATION_ID_CONFIG, allProps.getProperty("application.id"));
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final Topology topology = instance.buildTopology(allProps);

        instance.createTopics(allProps);

        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.cleanUp();
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

}

Compile and run the Kafka Streams program

8

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/streams-to-table-standalone-0.0.1.jar configuration/dev.properties

Produce sample data to the input topic

9

In a new terminal, run:

confluent kafka topic produce input-topic --parse-key

Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:

key_one:foo
key_one:bar
key_one:baz
key_two:foo
key_two:bar
key_two:baz

After you’ve sent the records, you can close the producer with Ctrl-C.

Consume data from the streams output topic

10

Now that you’ve sent the records to your Kafka Streams application, let’s look that the output. You’ve built a simple application so we don’t expect to see anything special, but you did convert a KStream to a KTable. A KStream is an event-stream meaning Kafka Streams forwards every record downstream. But a KTable is an update-stream which means Kafka Streams only forwards the latest update for a given key.

We’ll observe this in action in the next two steps. In this step, you’ll examine the output of the KStream and you should expect to see six output records which corresponds to the six input records you published before.

Run the following command to see the output of the event-stream:

confluent kafka topic consume streams-output-topic --print-key --delimiter " - " --from-beginning

After a few seconds you should see output like the following:

key_one - foo
key_one - bar
key_one - baz
key_two - foo
key_two - bar
key_two - baz

Now that you’ve confirmed the streams output, close this consumer with Ctrl-C.

Consume data from the table output topic

11

In the previous step you verified the record stream output, but in this step you’ll verify the update stream output.

Next, run the following command to see the output of the update-stream:

confluent kafka topic consume table-output-topic --print-key --delimiter " - " --from-beginning

After a few seconds you should see output like the following:

key_one - baz
key_two - baz

The difference in the output you should see is that instead of six records, you have two. When you converted the KStream (an event stream) to a materialized KTable (an update stream), Kafka Streams provides a cache in front of the state store. With the cache in place, new records replace existing records with the same key. Unlike a record stream where each record is independent, with an update stream, it’s ok to remove intermediate results. Kafka Streams flushes the cache when either the cache is full (10G by default) or when Kafka Streams commits the offsets of the records processed. In this case, when the Kafka Streams flushed the cache, you only have one record for each key.

Now that you’ve confirmed the streams output, close this consumer with Ctrl-C.

Teardown Confluent Cloud resources

12

You may try another tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all of the resources you created. Verify they are destroyed to avoid unexpected charges.