Get Started Free
‹ Back to courses
course: Spring Framework and Apache Kafka®

Process Messages with KafkaStreams and Spring Boot

6 min
Viktor Gamov

Viktor Gamov

Developer Advocate (Presenter)

Process Messages with Kafka Streams and Spring Boot

The great thing about using Kafka Streams with Spring Boot is that you can quickly start focusing on your Kafka Streams topologies—your KStreams and your KTables—because you don’t need to worry about lifecycles. Spring takes care of them. This means that you don’t need to manually create an instance of a Kafka Streams object, start it and stop it, etc. However, you can still get access to the object if you need it.

To autoconfigure Kafka Streams support in Spring Boot, you simply need to add the annotation @EnableKafkaStreams. If you have Kafka Streams JARs in your classpath, they will be picked up by the autoconfiguration. Alternatively, you can enable your configuration explicitly.

Spring provides a Jackson-based JSON SerDes plus a SerDes for Kafka Streams. So in the use cases where you don’t need to rely on Confluent Schema Registry—for example, if you are writing a standalone application—the JSON serialization works out of the box.

If you need to monitor your application, which you generally should, Kafka Streams exposes some methods through JMX, and Spring for Apache Kafka provides a wrapper around those metrics and makes them available through the Micrometer framework. This allows you to consume the metrics with other frameworks and dashboard tools. In addition, Spring provides some out-of-the-box implementations for error handling.

Here is a typical configuration for Kafka Streams in Spring Boot:

    @Configuration
    @EnableKafka
    @EnableKafkaStreams 
    public class KafkaStreamsConfig {

        @Bean(name = 
        KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            return new KafkaStreamsConfiguration(Map.of(
                APPLICATION_ID_CONFIG, "testStreams",
                BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName(),
                DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()
            ));
        }

    }

Note the explicit annotations at the top for enabling Kafka and Kafka Streams. Also note that in the bean here, you can provide a customized configuration for your Kafka Streams application, providing application ID, bootstrap server connection, details of a Kafka broker, etc. Or, you can define your config in application.properties, and this Kafka Streams bean is created by default and would be available for instantiating a Kafka Streams instance.

To build your Streams topology, you need a StreamsBuilder as an input parameter. Spring Boot can create it with defaults or you can do it explicitly. Once you have your StreamsBuilder in place, you will get access to all of the APIs available in Kafka Streams and it becomes just like a regular Kafka Streams application. Spring’s wrapper on top of Kafka Streams is very thin so you can focus on your business logic.

    @Bean 
    public KStream <Integer, String> kStream (StreamsBuilder kStreamBuilder) {
		KStream<integer, String> stream = kStreamBuilder.stream("streamingTopic1");
		stream
    		.mapValues((ValueMapper<String, String>) String::toUpperCase)
   	 	.groupByKey()
    		.windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
    		.reduce((String value1, String value2) -> value1 + value2, Named.as("windowStore"))
    		.toStream()
    		.map((windowedId, value) -> new KeyValue <>(windowedId.key(), value))
    		.filter((i, s) -> s.length() > 40)
    		.to("streamingTopic2");
   		stream.print(Printed.toSysOut());
		return stream;
}

Use the promo code SPRING101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Process Messages with KafkaStreams and Spring Boot

Hi, this is Viktor Gamov with Confluent and welcome back to the Spring Kafka for Confluent Cloud course. And in this module we're going to look at how you can use Kafka Streams together in the Spring Boot and the Spring Kafka framework and what needs to be done, how it is configured and what kind of things you need to know. As always, there will be practice afterwards so you can follow along and see how this will work in Confluent Cloud. So let's get to it. Here's a couple of things that you need to know, how Kafka Streams integration works with Spring Kafka. In order to kick in auto configuration in your Spring Boot application you just need to enable an addition EnableKafkaStreams. So if you have a Kafka Streams jars in a class path, this automatic configuration will be able to pick them up and enable this configuration. Otherwise you can always enable this explicitly. With the Spring Kafka support, Kafka Streams lifecycle will be fully managed by Spring and in this case you don't need to manually create an instance of Kafka Streams object and start it and stop it and make sure that when the application starts, you're doing certain things and after that you will be able to stop it. You don't need to do this; the lifecycle will be fully managed, even though you still can get access to instance of Kafka Streams application. And in a work through, in an exercise section, I will show you where it can be useful. Spring provides a JSON based, Jackson based, JSON Serializer and Deserializer. Plus it also provides Serdes for Kafka streams. In the many use cases where you don't need to rely on things like a schema registry, for example, you were adding some standalone application, you don't need to collaborate between different members of your team or different services, you need to publish the schema, The JSON serialization would work out of the box with those predefined Serializers and Deserializers. If you are in the world where you need to monitor and see what is going on with your application which you should do in production, Kafka Streams exposes some of the metrics through JMX and Spring Kafka provides a wrapper around those metrics and exposes them through micrometer frameworks. So those things will be available for consumption by different frameworks and developer tools. And also Spring provides some of the out of the box implementation of error handling mechanisms. So Kafka Streams has APIs that allow you to enable or specify some error handler for a different type of situation. And Spring Boot provides different strategies that you can apply in your application. So let's take a look. For this code snippet we're going to look at more details in the exercise section, but here you will see a typical configuration for your Kafka Streams application. One of the things that you might notice here is that the configuration has explicit annotations about EnableKafka, EnableKafkaStreams. So this is where Spring Boot will understand that okay, I need to initiate these Kafka Streams and also I will be using, or I will be instantiating this class called StreamsBuilder. We're going to talk about this in the next slide. In this bean that defines in configuration you can provide customized configuration for your Kafka Streams application. You can provide application ID, and bootstrap server connection details to Kafka broker or you can define this configuration in application dot properties. And this Kafka Streams configuration bean would be created by default and will be available for instantiating the instances of Kafka Streams application. What I like about using Kafka Streams with Spring Boot is that essentially you can very quickly start focusing on your topology, on your Kafka Streams and KStreams and your Ktables, where you don't need to worry much about the lifecycle; Spring will take care of it. And in this case, only one method to building this topology will require StreamsBuilder as an input parameter, which Spring will inject. You can create this explicitly or Spring can create this for you with some default configurations. So once you have this StreamsBuilder in place, with this method, you will be able to get access to all APIs that are available from Kafka Streams. And in this case, it just becomes like your regular Kafka Streams application. Forget about Spring, forget about anything, it will just work like your plain Java. Here Spring provides a really, really thin wrapper on top of this API, so you just can focus on the good things. All right. In the next section, you will be able to use some of the knowledge that I just described and implement some of the word processing application using Kafka Streams and Spring Boot. Also, you will be able to see how this application would look in the Confluent Cloud, what kind of topics will be created there and so forth and so on. So let's get to it. It's going to be nice!