前言
Kafka有提供生产者/消费者的拦截器的机制。这些拦截器由Kafka管理,而不是Spring,因此正常的依赖注入Spring Beans不会生效。
如何解决这个问题?
可以通过在拦截器的configure(…)方法中进行解析。
例如,
@Configuration
public class MyKafkaConfiguration {
@Bean
public ConsumerFactory<?, ?> consumerFactory(KafkaProperties kafkaProperties, SomeBean someBean) {
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ProducerFactory<?, ?> producerFactory(KafkaProperties kafkaProperties, SomeBean someBean) {
Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCPETOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
return new DefaultKafkaProducerFactory<>(producerProperties);
}
@Bean
public SomeBean someBean() {
return new SomeBean();
}
}
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean.");
}
}
public class MyProducerInterceptor implements ProducerInterceptor {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
}
public class MyConsumerInterceptor implements ConsumerInterceptor {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("producer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override
public void close() {}
}
ProducerInterceptor
![]()
实际上对应Producer的属性key value。可以通过此方法,设置或者获取相关的属性key value。

调用 KafkaProducer#send(ProducerRecord) 或者 KafkaProducer#send(ProducerRecord, Callback) 方法时,在key、value被序列化,分区分配(如果ProducerRecord没有指定分区)之前,该方法被调用。
![]()
发送给服务端的消息被确认 或者 发送失败时,该方法被调用。

拦截器被关闭时,该方法被调用。
ConsumerInterceptor
![]()
实际上对应Consumer的属性key value。可以通过此方法,设置或者获取相关的属性key value。
![]()
KafkaConsumer#poll(Duration)返回记录之前,该方法被调用。
![]()
位移被提交时,该方法被调用。

拦截器被关闭时,该方法被调用。
版权声明:本文为qq_34561892原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。