Kafka使用篇 - 生产者/消费者 拦截器

前言

Kafka有提供生产者/消费者的拦截器的机制。这些拦截器由Kafka管理,而不是Spring,因此正常的依赖注入Spring Beans不会生效。

如何解决这个问题?

可以通过在拦截器的configure(…)方法中进行解析。

例如,

@Configuration
public class MyKafkaConfiguration {

	@Bean
	public ConsumerFactory<?, ?> consumerFactory(KafkaProperties kafkaProperties, SomeBean someBean) {
		Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
		consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
		consumerProperties.put("some.bean", someBean);
		return new DefaultKafkaConsumerFactory<>(consumerProperties);
	}

	@Bean
	public ProducerFactory<?, ?> producerFactory(KafkaProperties kafkaProperties, SomeBean someBean) {
		Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
		producerProperties.put(ProducerConfig.INTERCPETOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
		return new DefaultKafkaProducerFactory<>(producerProperties);
	}

	@Bean
	public SomeBean someBean() {
		return new SomeBean();
	}
}
public class SomeBean {
	public void someMethod(String what) {
		System.out.println(what + " in my foo bean.");
	}
}
public class MyProducerInterceptor implements ProducerInterceptor {

	private SomeBean bean;

	@Override
	public void configure(Map<String, ?> configs) {
		this.bean = (SomeBean) configs.get("some.bean");
	}
	
	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		this.bean.someMethod("producer interceptor");
		return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}

	@Override
	public void close() {}
}
public class MyConsumerInterceptor implements ConsumerInterceptor {

	private SomeBean bean;

	@Override
	public void configure(Map<String, ?> configs) {
		this.bean = (SomeBean) configs.get("some.bean");
	}
	
	@Override
	public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
		this.bean.someMethod("producer interceptor");
		return records;
	}

	@Override
	public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}

	@Override
	public void close() {}
}

ProducerInterceptor

在这里插入图片描述
实际上对应Producer的属性key value。可以通过此方法,设置或者获取相关的属性key value。

在这里插入图片描述
调用 KafkaProducer#send(ProducerRecord) 或者 KafkaProducer#send(ProducerRecord, Callback) 方法时,在key、value被序列化,分区分配(如果ProducerRecord没有指定分区)之前,该方法被调用。

在这里插入图片描述
发送给服务端的消息被确认 或者 发送失败时,该方法被调用。

在这里插入图片描述
拦截器被关闭时,该方法被调用。

ConsumerInterceptor

在这里插入图片描述
实际上对应Consumer的属性key value。可以通过此方法,设置或者获取相关的属性key value。

在这里插入图片描述
KafkaConsumer#poll(Duration)返回记录之前,该方法被调用。

在这里插入图片描述
位移被提交时,该方法被调用。

在这里插入图片描述
拦截器被关闭时,该方法被调用。


版权声明:本文为qq_34561892原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。