How do you convert a KStream to a KTable without having to perform a dummy aggregation operation?
This tutorial installs Confluent Platform using Docker. Before proceeding:
• Install Docker Desktop (version 4.0.0
or later) or Docker Engine (version 19.03.0
or later) if you don’t already have it
• Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.
• Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd
, via systemctl
• Verify that Docker is set up properly by ensuring no errors are output when you run docker info
and docker compose version
on the command line
To get started, make a new directory anywhere you’d like for this project:
mkdir streams-to-table && cd streams-to-table
Next, create the following docker-compose.yml
file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN
And launch it by running:
docker compose up -d
Create the following Gradle build file, named build.gradle
for the project:
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
}
}
plugins {
id "java"
id "com.google.cloud.tools.jib" version "3.3.1"
id "idea"
id "eclipse"
}
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"
repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}
apply plugin: "com.github.johnrengelman.shadow"
dependencies {
implementation "org.slf4j:slf4j-simple:2.0.7"
implementation 'org.apache.kafka:kafka-streams:3.4.0'
implementation ('org.apache.kafka:kafka-clients') {
version {
strictly '3.4.0'
}
}
testImplementation "org.apache.kafka:kafka-streams-test-utils:3.4.0"
testImplementation "junit:junit:4.13.2"
testImplementation 'org.hamcrest:hamcrest:2.2'
}
test {
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
exceptionFormat = "full"
}
}
jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
"Main-Class": "io.confluent.developer.StreamsToTable"
)
}
}
shadowJar {
archiveBaseName = "streams-to-table-standalone"
archiveClassifier = ''
}
And be sure to run the following command to obtain the Gradle wrapper:
gradle wrapper
Next, create a directory for configuration data:
mkdir configuration
Then create a development file at configuration/dev.properties
:
application.id=streams-to-table
bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081
input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1
streams.output.topic.name=streams-output-topic
streams.output.topic.partitions=1
streams.output.topic.replication.factor=1
table.output.topic.name=table-output-topic
table.output.topic.partitions=1
table.output.topic.replication.factor=1
Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
The heart of this tutorial is a simple one liner. You’ll take an existing KStream
object and use the toTable()
method to covert it into a KTable
. This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you’ve materialized the KTable
, so it’s available for you to use Interactive Queries.
final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
// this line takes the previous KStream and converts it to a KTable
final KTable<String, String> convertedTable = stream.toTable(Materialized.as("stream-converted-to-table"));
The rest of this Kafka Streams application simply writes the incoming records back out to a topic. In the subsequent tutorial steps you’ll use a console consumer to observe the differences between a record stream and a changelog stream.
Now go ahead and create the following file at src/main/java/io/confluent/developer/StreamsToTable.java
.
package io.confluent.developer;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
public class StreamsToTable {
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = allProps.getProperty("input.topic.name");
final String streamsOutputTopic = allProps.getProperty("streams.output.topic.name");
final String tableOutputTopic = allProps.getProperty("table.output.topic.name");
final Serde<String> stringSerde = Serdes.String();
final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
final KTable<String, String> convertedTable = stream.toTable(Materialized.as("stream-converted-to-table"));
stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
public void createTopics(final Properties allProps) {
try (final AdminClient client = AdminClient.create(allProps)) {
final List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("input.topic.name"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("streams.output.topic.name"),
Integer.parseInt(allProps.getProperty("streams.output.topic.partitions")),
Short.parseShort(allProps.getProperty("streams.output.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("table.output.topic.name"),
Integer.parseInt(allProps.getProperty("table.output.topic.partitions")),
Short.parseShort(allProps.getProperty("table.output.topic.replication.factor"))));
client.createTopics(topics);
}
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
final StreamsToTable instance = new StreamsToTable();
final Properties allProps = new Properties();
try (InputStream inputStream = new FileInputStream(args[0])) {
allProps.load(inputStream);
}
allProps.put(StreamsConfig.APPLICATION_ID_CONFIG, allProps.getProperty("application.id"));
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final Topology topology = instance.buildTopology(allProps);
instance.createTopics(allProps);
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.cleanUp();
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
In your terminal, run:
./gradlew shadowJar
Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.
java -jar build/libs/streams-to-table-standalone-0.0.1.jar configuration/dev.properties
To get started, let’s first open a shell on the container broker. You’ll use the broker shell for running a console producer and consumer throughout the tutorial. Open a new terminal window and run the following command:
docker exec broker bash
Then let’s run the following command in the broker container shell from the previous step to start a new console producer:
kafka-console-producer --topic input-topic --bootstrap-server broker:9092 \
--property parse.key=true \
--property key.separator=":"
Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:
key_one:foo
key_one:bar
key_one:baz
key_two:foo
key_two:bar
key_two:baz
After you’ve sent the records, you can close the producer with Ctrl-C
, but keep the broker container shell open as you’ll still need it for the next few steps.
Now that you’ve sent the records to your Kafka Streams application, let’s look that the output. You’ve built a simple application so we don’t expect to see anything special, but you did convert a KStream
to a KTable
. A KStream
is an event-stream meaning Kafka Streams forwards every record downstream. But a KTable
is an update-stream which means Kafka Streams only forwards the latest update for a given key.
We’ll observe this in action in the next two steps. In this step, you’ll examine the output of the KStream
and you should expect to see six output records which corresponds to the six input records you published before.
Run the following command to see the output of the event-stream:
kafka-console-consumer --topic streams-output-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator=" - "
After a few seconds you should see output like the following:
key_one - foo
key_one - bar
key_one - baz
key_two - foo
key_two - bar
key_two - baz
Now that you’ve confirmed the streams output, close this consumer with Ctrl-C
.
In the previous step you verified the record stream output, but in this step you’ll verify the update stream output.
Next, run the following command to see the output of the update-stream:
kafka-console-consumer --topic table-output-topic --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator=" - "
After a few seconds you should see output like the following:
key_one - baz
key_two - baz
The difference in the output you should see is that instead of six records, you have two. When you converted the KStream
(an event stream) to a materialized KTable
(an update stream), Kafka Streams provides a cache in front of the state store. With the cache in place, new records replace existing records with the same key. Unlike a record stream where each record is independent, with an update stream, it’s ok to remove intermediate results. Kafka Streams flushes the cache when either the cache is full (10G by default) or when Kafka Streams commits the offsets of the records processed. In this case, when the Kafka Streams flushed the cache, you only have one record for each key.
Now that you’ve confirmed the streams output, close this consumer with Ctrl-C
.
You’re all done now!
Go back to your open windows and stop any console consumers with Ctrl-C
then close the container shells with Ctrl-D
.
Then you can shut down the docker container by running:
docker compose down --volumes
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.
Sign up for Confluent Cloud, a fully managed Apache Kafka service.
After you log in to Confluent Cloud Console, click on Add cloud environment
and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the Menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details).
Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.
Next, from the Confluent Cloud Console, click on Clients
to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.