Get Started Free
‹ Back to courses
course: Spring Framework and Apache Kafka®

Receiving Messages with KafkaListener

6 min
Viktor Gamov

Viktor Gamov

Developer Advocate (Presenter)

Receiving Messages with KafkaListener

Factories drive a lot of functionality in Spring Boot. You learned in Sending Messages to Confluent Cloud with Spring Boot that ProducerFactory instantiates Apache Kafka producers. Similarly, ConsumerFactory instantiates Kafka consumers.

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProperties());
  }

For a ConsumerFactory, you need to provide the property files or configurations that your consumer will use. So you need to create a consumerProperties bean, which is essentially a Map with config key pairs:

  @Bean
  public Map<String, Object> consumerProperties() {
    return Map.of(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
        GROUP_ID_CONFIG, "spring-ccloud",
        ENABLE_AUTO_COMMIT_CONFIG, false,
        SESSION_TIMEOUT_MS_CONFIG, 15000,
        KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
        VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}

Message-Driven POJOs

Inside of Spring Boot, the components that integrate with messaging systems follow the pattern of “message-driven POJOs.” If you’ve been around the Java community for a while, you know that in the Java EE specification message-driven beans were introduced as part of the enterprise JavaBean specification. It was a good idea but difficult to put into practice, requiring an application server to run. Fortunately, the Spring Framework came up with a simplified version of those beans, not enterprise beans, but just regular POJOs made into Spring-managed beans.

Message-driven POJOs enable asynchronous communication between systems, so essentially you define a message listener and the framework takes care of the functionality (in the past, you would have had to implement an interface with the method listen(); the old functionality still exists, but it’s not necessarily useful anymore because of annotations).

This approach to message listening doesn’t have a direct coupling to the Kafka consumer API, and that’s why the layer of message-driven POJO support can be implemented as something that is abstracted out. So you don’t put any Kafka-related code in your POJO—you can just have one method with one input parameter. This is also a good opportunity to use an annotation. When you annotate the method, Spring takes care of instantiating the underlying containers that will run your Kafka consumers and read messages from your Kafka topics and handle serialization. All of these things are managed by Spring so you can focus on your application code. The annotation KafkaListener instantiates a facility called MessageListenerContainer, which handles parallelization, configuration, retries, and other things that the Kafka application requires, such as offsets.

Use the promo code SPRING101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Receiving Messages with KafkaListener

Hi, this is Viktor Gamov with Confluent. And in this module, I will tell you what kind of APIs from Spring Kafka you can use to receive messages from Kafka Topic in Confluent Cloud. Let's get to it. So the main component that drives all things inside the Spring Boot is always the factories. And if you learned on the previous lesson that the producer factor will be responsible for instantiating Kafka producers. So in this case, ConsumerFactory will be responsible for instantiated Kafka Consumers in our application. So for that, as always, we need to provide a property files or like configurations that our consumer will be using. So in this case, we creating a consumer properties bean which essentially is a map that has a key pair, keys, some of the config parameters, value, obviously value. And after that, we need to create a bean, ConsumerFactory bean that will take this parameters and we'll be responsible for instantiating a new producer. Inside the Spring Boot, the many components that integrate with messaging systems, for the pattern of message driven POJO. If you've been around the Java community for a while you know that in Java E specification, or even Java 2E specification, back in the day they introduced a message driven beans as a part of enterprise jellybean specification. But it was something that's very cool as idea, but it's was very difficult to use. It requires your application server in order to run this. And the good people from the Spring Framework came with this idea of a simplified version of those beans; should take this not the enterprise Java beans, but make them just the regular Java POJOs and made them like Spring managed beans. And that's where the concept of message driven POJO comes into life. So message driven POJO allows asynchronous communication between the systems. So essentially you define a message listener. In the past, you had to implement interface that will have method listen. And in this case, the Framework will take care of this. This functionality still exists but it's not necessarily useful anymore, because there is annotations that allows you to put the annotation to any interface. So the concept of this, like a messaging, will not have direct connection to Kafka API. So that's why the layer of message driven POJO support can be implemented as something that is abstracted out and only particular implementation will fill up the blanks and put the integration with Kafka Consumer API. So by default this messaging POJOs don't depend on underlying technology. You are not putting any Kafka related code in your POJO. You can just have a one method that will have one input parameter. This input parameter will come from somewhere, from something that will invoke this method. And that also brings very good opportunity to use annotation here. So if you annotate this method so the Spring will take care of instantiating underlying containers that will be running Kafka Consumers, reading messages from Kafka Topic, handling serialization and only after injecting a right value in your application code. All these things would be managed by Spring. So you as a developer, as always, need to focus only on actual implementation. So KafkaListener allows to receive messages and it will instantiate the facility called MessageListenerContainer. So this container will be handling parallelization, configuration, the retries, different things that maybe Kafka application will require things like do I need to commit my offset automatically or I need to commit my offsets manually and all that kind of stuff. The annotation brings the simplicity for the code. You don't need to extend any interfaces. So your POJO will just stick to your business logic. It would be much easier to test. You will not have a dependency on any Framework things. So it would be a much cleaner code. And the annotation will be just indication for Spring Framework that the code, actual the underlying communication would be dedicated to specific listener. It is much, much easier to demonstrate how this thing works together. So that's why in the next exercise, I will walk you through the process, how you can create your message driven POJO annotate this KafkaListener and start receiving messages from Kafka Topics in Confluent Cloud.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.