最近工作需要集成kakfa到应用中,应用的主体框架使用Spring Boot,因此研究了下Kafka在Spring Boot中的集成方案。
1. 依赖
引入spring-kafka包,版本号由Spring Boot自动管理。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. AutoConfiguration
Spring Boot封装了对Kafka的auto configuration特性,对应的类为KafkaAutoConfiguration,spring-boot-autoconfigure包中。
3. 源码解释
首先看配置的入口类KafkaAutoConfiguration:
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration
KafkaTemplate.class:spring-kafka包中的类,用于判定应用中是否引入了spring-kafka,借此确定是否需要启动kafka相关配置;KafkaProperties.class:配置属性类,Spring Boot应用中和kafka相关的配置都保存在这个类中,这个类比较简单,在这里直接介绍完:
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties
根据上面源码,Spring Boot中和kafka相关的配置都以spring.kafka打头。
KafkaAnnotationDrivenConfiguration:主要用来处理spring-kafka中和kafka配置的相关注解的处理逻辑,后面详细介绍;KafkaStreamsAnnotationDrivenConfiguration:和kafka流计算相关功能的配置逻辑,目前工程用不到,简单介绍完:
@Configuration
@ConditionalOnClass(StreamsBuilder.class)
@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
class KafkaStreamsAnnotationDrivenConfiguration
StreamBuilder.class不在spring-kafka包中,也不在其依赖的包中,所以这个配置类不会加载解析。
然后,来看下KafkaAutoConfiguration的构造函数,该函数会被Spring IoC容器启动时调用,以完成对象实例化。
public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
}
KafkaProperties:上面解释过,和kafka相关的配置信息ObjectProvider<RecordMessageConverter>:ObjectProvider可以理解是可空的依赖注入,如果IoC容器中有相关类型的bean对象就注入,没有的话就是空的
最后,看下KafkaAutoConfiguration创建了哪些bean对象:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
@Bean
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
@ConditionalOnMissingBean
public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
@Bean
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin() {
KafkaTemplate:spring-kafka中用来发送消息的对象,内部保存了ProducerFactory,如果用户没有自定义则创建;ProducerListener:发送消息的回调对象,比如消息发送成功/失败后续需要执行的操作可以封装在这个对象中,如果用户没有定义则创建;ConsumerFactory:用于生成实际和kafka交互的Consumer对象,消费kafka消息的,如果用户没有定义则创建;ProducerFactory:用于生成实际和kafka交互的Producer对象,如果用户没有定义则创建;KafkaTransactionManager:kafka的事务管理器,相关属性配置且用户没有定义则创建;KafkaJaasLoginModuleInitializer:Jaas Login组件,特殊场景使用,相关属性配置且用户没有定义则创建;KafkaAdmin:用作创建topic,用户没有定义则创建
3.1 KafkaAnnotationDrivenConfiguration
首先,看一下类定义:
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration
EnableKafka.class:这是一个注解,也在spring-kafka包中
下面,是该类的构造方法:
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
}
除了KafkaProperties必须要有之外,其他的依赖都是可选的。KafkaProperties前面已经解释过了。
现在,看下该类创建了哪些bean对象:
@Bean
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactoryConfigurer:Spring Boot应用中的惯用套路,某个组件/模块的配置类ConcurrentKafkaListenerContainerFactory:支持并发消费的kafka消费类容器,内部封装了ConsumerFactory;ConsumerFactory前面解释过了,在KafkaAutoConfiguration中创建的;
最后,在KafkaAnnotationDrivenConfiguration中定义了一个内部配置类:
@Configuration
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableKafkaConfiguration {
@EnableKafka:这个注解用来启用kafka相关注解配置功能,如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
主要起作用的是
KafkaBootstrapConfiguration.class
KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME:这个属性是用来判断相关组件是否已经加载了,到目前为止肯定是没有加载的。
3.2 KafkaBootstrapConfiguration
这个配置类,就创建了两个bean对象:
@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
return new KafkaListenerAnnotationBeanPostProcessor();
}
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry();
}
KafkaListenerAnnotationBeanPostProcessor:这个类主要是解析和kafka消费端相关的注解,生成对应KafkaListener对象,并注册到KafkaListenerEndpointRegistry中;KafkaListenerEndpointRegistry:这个类是Spring Boot中保存所有kafka消费监听类的对象,内部逻辑不展开了,对于理解整个框架来说,不重要;
3.2 KafkaListenerAnnotationBeanPostProcessor
这个类的申明部分如下:
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton
主要关注:
BeanPostProcessor:每一个bean实例化时,都会调用这个接口定义的回调方法SmartInitializingSingleton:所有bean都完成实例化之后,调用的方法
BeanPostProcessor的回调方法在SmartInitializingSingleton的回调方法调用之前调用。
主要逻辑是:
首先,在
postProcessAfterInitialization方法中,扫描所有bean对象的@KafkaListener和@KafkaHandler方法,然后保存在KafkaListenerEndpointRegistrar.endpointDescriptors集合中;然后,在
afterSingletonsInstantiated方法中,触发相关逻辑,将KafkaListenerEndpointRegistrar.endpointDescriptors保存的相关kafka消费对象注册到KafkaListenerEndpointRegistry上;
具体代码不展示了,逻辑上比较简单的。