Spring Boot应用自动配置Kafka过程解析

​最近工作需要集成kakfa到应用中,应用的主体框架使用Spring Boot,因此研究了下Kafka在Spring Boot中的集成方案。

1. 依赖

引入spring-kafka包,版本号由Spring Boot自动管理。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. AutoConfiguration

Spring Boot封装了对Kafka的auto configuration特性,对应的类为KafkaAutoConfigurationspring-boot-autoconfigure包中。

3. 源码解释

首先看配置的入口类KafkaAutoConfiguration

@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration
  • KafkaTemplate.classspring-kafka包中的类,用于判定应用中是否引入了spring-kafka,借此确定是否需要启动kafka相关配置;

  • KafkaProperties.class:配置属性类,Spring Boot应用中和kafka相关的配置都保存在这个类中,这个类比较简单,在这里直接介绍完:

@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties

根据上面源码,Spring Boot中和kafka相关的配置都以spring.kafka打头。

  • KafkaAnnotationDrivenConfiguration:主要用来处理spring-kafka中和kafka配置的相关注解的处理逻辑,后面详细介绍;

  • KafkaStreamsAnnotationDrivenConfiguration:和kafka流计算相关功能的配置逻辑,目前工程用不到,简单介绍完:

@Configuration
@ConditionalOnClass(StreamsBuilder.class)
@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
class KafkaStreamsAnnotationDrivenConfiguration

StreamBuilder.class不在spring-kafka包中,也不在其依赖的包中,所以这个配置类不会加载解析。

然后,来看下KafkaAutoConfiguration的构造函数,该函数会被Spring IoC容器启动时调用,以完成对象实例化。

public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) {
 this.properties = properties;
 this.messageConverter = messageConverter.getIfUnique();
}
  • KafkaProperties:上面解释过,和kafka相关的配置信息

  • ObjectProvider<RecordMessageConverter>ObjectProvider可以理解是可空的依赖注入,如果IoC容器中有相关类型的bean对象就注入,没有的话就是空的

最后,看下KafkaAutoConfiguration创建了哪些bean对象:

@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
@Bean
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
@ConditionalOnMissingBean
public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
@Bean
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin() {
  • KafkaTemplatespring-kafka中用来发送消息的对象,内部保存了ProducerFactory,如果用户没有自定义则创建;

  • ProducerListener:发送消息的回调对象,比如消息发送成功/失败后续需要执行的操作可以封装在这个对象中,如果用户没有定义则创建;

  • ConsumerFactory:用于生成实际和kafka交互的Consumer对象,消费kafka消息的,如果用户没有定义则创建;

  • ProducerFactory:用于生成实际和kafka交互的Producer对象,如果用户没有定义则创建;

  • KafkaTransactionManager:kafka的事务管理器,相关属性配置且用户没有定义则创建;

  • KafkaJaasLoginModuleInitializer:Jaas Login组件,特殊场景使用,相关属性配置且用户没有定义则创建;

  • KafkaAdmin:用作创建topic,用户没有定义则创建

3.1 KafkaAnnotationDrivenConfiguration

首先,看一下类定义:

@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration
  • EnableKafka.class:这是一个注解,也在spring-kafka包中

下面,是该类的构造方法:

 KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
   ObjectProvider<RecordMessageConverter> messageConverter,
   ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
   ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
   ObjectProvider<ErrorHandler> errorHandler,
   ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
  this.properties = properties;
  this.messageConverter = messageConverter.getIfUnique();
  this.kafkaTemplate = kafkaTemplate.getIfUnique();
  this.transactionManager = kafkaTransactionManager.getIfUnique();
  this.errorHandler = errorHandler.getIfUnique();
  this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
 }

除了KafkaProperties必须要有之外,其他的依赖都是可选的。KafkaProperties前面已经解释过了。

现在,看下该类创建了哪些bean对象:

 @Bean
 @ConditionalOnMissingBean
 public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {

 @Bean
 @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
 public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
   ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
   ConsumerFactory<Object, Object> kafkaConsumerFactory) {
  • ConcurrentKafkaListenerContainerFactoryConfigurer:Spring Boot应用中的惯用套路,某个组件/模块的配置类

  • ConcurrentKafkaListenerContainerFactory:支持并发消费的kafka消费类容器,内部封装了ConsumerFactoryConsumerFactory前面解释过了,在KafkaAutoConfiguration中创建的;

最后,在KafkaAnnotationDrivenConfiguration中定义了一个内部配置类:

 @Configuration
 @EnableKafka
 @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
 protected static class EnableKafkaConfiguration {
  • @EnableKafka:这个注解用来启用kafka相关注解配置功能,如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {

主要起作用的是KafkaBootstrapConfiguration.class

  • KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME:这个属性是用来判断相关组件是否已经加载了,到目前为止肯定是没有加载的。

3.2 KafkaBootstrapConfiguration

这个配置类,就创建了两个bean对象:

 @SuppressWarnings("rawtypes")
 @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
 public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
  return new KafkaListenerAnnotationBeanPostProcessor();
 }

 @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
 public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
  return new KafkaListenerEndpointRegistry();
 }
  • KafkaListenerAnnotationBeanPostProcessor:这个类主要是解析和kafka消费端相关的注解,生成对应KafkaListener对象,并注册到KafkaListenerEndpointRegistry中;

  • KafkaListenerEndpointRegistry:这个类是Spring Boot中保存所有kafka消费监听类的对象,内部逻辑不展开了,对于理解整个框架来说,不重要;

3.2 KafkaListenerAnnotationBeanPostProcessor

这个类的申明部分如下:

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
  implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton

主要关注:

  • BeanPostProcessor:每一个bean实例化时,都会调用这个接口定义的回调方法

  • SmartInitializingSingleton:所有bean都完成实例化之后,调用的方法

BeanPostProcessor的回调方法在SmartInitializingSingleton的回调方法调用之前调用。

主要逻辑是:

  • 首先,在postProcessAfterInitialization方法中,扫描所有bean对象的@KafkaListener@KafkaHandler方法,然后保存在KafkaListenerEndpointRegistrar.endpointDescriptors集合中;

  • 然后,在afterSingletonsInstantiated方法中,触发相关逻辑,将KafkaListenerEndpointRegistrar.endpointDescriptors保存的相关kafka消费对象注册到KafkaListenerEndpointRegistry上;

具体代码不展示了,逻辑上比较简单的。

 


版权声明:本文为Lambert_Wang原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。