Kafka Streams applications define their logic in a processor topology, which is a graph of stream processors (nodes) and streams (edges). There are two methods for defining these components in your Kafka Streams application, the Streams DSL and the Processor API. The Streams DSL provides built-in abstractions for common event stream processing concepts like streams, tables, and transformations, while the Processor API can be used for advanced cases not supported by the DSL.
The Streams DSL is recommended for most use cases and this tutorial will use it to define a basic text processing application. To get started, let’s focus on the important bits of Kafka Streams application code, highlighting the DSL usage.
static Topology buildTopology(String inputTopic, String outputTopic) {
Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
.peek((k,v) -> logger.info("Observed event: {}", v))
.mapValues(s -> s.toUpperCase())
.peek((k,v) -> logger.info("Transformed event: {}", v))
.to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
In the code above, the StreamsBuilder
class is used to construct the design of the topology. The DSL API allows you to construct your application by chaining together the desired behaviors using a fluent API.
A typical topology follows a common pattern:
-
Consume one or more input streams using the stream
function which accepts the names of the Kafka topics to consume from along with the deserializers required to decode the data.
-
Transform events by chaining together one or more transformations. In our example, we use mapValues
to convert incoming String events to their upper case value.
-
Transformed events are streamed as the output of the topology using the to
function specifying a destination topic as well as the serializers required to encode the data.
The peek
function allows you to observe and act on events and they flow through the topology stages. In our example it is used to debug the topology by printing events as they flow through the topology.
Once the topology is defined within the builder, the buildTopology
function returns an instance of the Topology
created from builder.build
. Separating the building of the Topology
in a function is useful for testing purposes, which we will see in the Test It section of the tutorial.
Now go ahead and create the full Java source file at src/main/java/io/confluent/developer/KafkaStreamsApplication.java
.
package io.confluent.developer;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class KafkaStreamsApplication {
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class);
static void runKafkaStreams(final KafkaStreams streams) {
final CountDownLatch latch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) {
latch.countDown();
}
});
streams.start();
try {
latch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("Streams Closed");
}
static Topology buildTopology(String inputTopic, String outputTopic) {
Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder
.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
.peek((k,v) -> logger.info("Observed event: {}", v))
.mapValues(s -> s.toUpperCase())
.peek((k,v) -> logger.info("Transformed event: {}", v))
.to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to a configuration file.");
}
Properties props = new Properties();
try (InputStream inputStream = new FileInputStream(args[0])) {
props.load(inputStream);
}
final String inputTopic = props.getProperty("input.topic.name");
final String outputTopic = props.getProperty("output.topic.name");
try (Util utility = new Util()) {
utility.createTopics(
props,
Arrays.asList(
new NewTopic(inputTopic, Optional.empty(), Optional.empty()),
new NewTopic(outputTopic, Optional.empty(), Optional.empty())));
// Ramdomizer only used to produce sample data for this application, not typical usage
try (Util.Randomizer rando = utility.startNewRandomizer(props, inputTopic)) {
KafkaStreams kafkaStreams = new KafkaStreams(
buildTopology(inputTopic, outputTopic),
props);
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
logger.info("Kafka Streams 101 App Started");
runKafkaStreams(kafkaStreams);
}
}
}
}