Developer Advocate (Presenter)
For many years, Apache Kafka administrators used command line tools to perform admin operations like creating topics, changing topic configurations, assigning partitions, etc. But with the Kafka Admin API (AdminClient class), those operations can now be done programmatically. Spring Boot and Spring for Apache Kafka include integration for AdminClient so you can perform admin operations from within your application’s configuration.
In the Spring for Apache Kafka framework, AdminClient is supported in a few ways. There is a KafkaAdmin class, which is a wrapper for AdminClient. It implements a Spring FactoryBean that is used to maintain and support the lifecycle of an AdminClient. Then there’s the TopicBuilder class, which provides a convenient API for creating topic configurations.
You need to supply a configuration to the KafkaAdmin FactoryBean so that it can connect to a broker, and this configuration can potentially include information about a bootstrap server or certain security configurations.
@Bean
public KafkaAdmin admin() {
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
}
After that step, a KafkaAdmin client is created. However, it’s much easier to use Spring Boot rather than define the bean yourself, so that your configuration can be outsourced and placed in application.properties or application.yaml, where Spring Boot can access it and instantiate a corresponding KafkaAdmin instance.
Once the KafkaAdmin bean is available in the classpath one way or another, you can create multiple other beans with type NewTopic, which is Kafka’s AdminClient type. Those topics will then be created in a Kafka cluster.
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
You can use TopicBuilder methods to provide some of the standard configurations like number of partitions, number of replicas, compaction, and so on. You can also use some extra configuration parameters that may not have API methods, for example, there is a configuration you can use to change compression type (the algorithm used to compress data in a topic).
In addition, TopicBuilder also provides methods that let you manually assign replicas for your topic when the topic is created:
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Viktor Gamov with Confluent. For many years, Kafka administrators were using the command-line tools to perform some admin operations; creating topics, changing topic configuration, assigning partitions, etc. But with KafkaAdmin API, those operations can be done now programmatically. Spring Boot and Spring Kafka include integration for KafkaAdmin clients so you can perform some admin operations within your application configuration. In this lesson and exercise, you will learn how you can use this, so let's get to it. In the Spring Kafka framework, Kafka AdminClient is supported in a few ways. Essentially, there is a KafkaAdmin class which implements a Spring factory bean that will be used to maintain and support life cycle of this AdminClient. It's actually wrapper around the KafkaAdmin API. And another class. TopicBuilder provides convenient API for creating a topic configuration. Let's quickly take a look on those. The KafkaAdmin is, like I said, it's a factory bean that essentially you need to supply with some configuration. You need to provide configuration to connect to broker. It can be information about bootstrap server, information about certain security configurations. After that, Kafka AdminClient will be created if you define this bean yourself, if you do this without Spring Boot. In this case, this bean will be used to create other topics. It is much simpler to do with Spring Boot where configuration can be both sourced and placed in application.properties or application.yml, where Spring Boot will automatically pick up and instantiate a corresponding KafkaAdmin instance. Once this KafkaAdmin bean is available in class path one way or another, so you can create multiple other beans with type NewTopic, which is the Kafka's AdminClient type. In this case, those topics will be executed and will be created in Kafka cluster. You can use TopicBuilder methods to provide some of the standard configurations, like number of partitions, number of replicas, if you want to enable compaction, and so far and so on. Also, you can use some configuration parameters that not maybe have this API methods. For example, there's some configuration that you can change in config. Like, for example, the compression type, which type of algorithm to use to compress data in a topic. In this case, you can also provide arbitrary config. Moreover, Spring TopicBuilder also provides the methods that allows you to manually assign replicas for your topic when the topic was created. It is quite rich thing to do. In the next exercise, you will learn how you can use all these things in your application; how you can create new topic, how you can modify configuration for existing topic, and so far and so on. Also, you will be able to see how this would be working in Confluent Cloud, so stick around.