配置消费者工厂
/**
* 构建kafka消费者参数
*
* @return kafka消费者参数
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
List<String> brokerList = kafkaConfigProperties.getBrokerList();
if (CollectionUtils.isEmpty(brokerList)) {
log.info("kafka主机配置为空!");
throw new IllegalArgumentException("kafka--broker 为空!");
}
HashMap<String, Object> hashMap = new HashMap<>();
// 连接服务器地址
hashMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.join(kafkaConfigProperties.getBrokerList(), ","));
// key 序列化
hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value 序列化
hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
hashMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Optional.ofNullable(kafkaConfigProperties.getMaxPollRecords()).orElse(500));
return new DefaultKafkaConsumerFactory<>(hashMap);
}
配置监听工厂
/**
* 构建kafka监听工厂
*
* @param consumerFactory 消费者参数
* @return 监听工厂
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 设置 分区数
factory.setConcurrency(Optional.ofNullable(kafkaConfigProperties.getPartitionNum()).orElse(1));
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
构建监听工厂
/**
* 构建kafka消费者监听工厂
*
* @param consumerInfo 消费者信息
* @return 监听工厂
*/
public static KafkaMessageListenerContainer<String, String> buildKafkaListenerContainerFactory(KafkaConsumerInfoDto consumerInfo) {
ConsumerFactory<String, String> consumerFactory = SpringUtil.getBean("consumerFactory", ConsumerFactory.class);
// 设置订阅主题
ContainerProperties containerProperties = new ContainerProperties(consumerInfo.getTopic());
// 设置分组
containerProperties.setGroupId(consumerInfo.getGroup());
// 设置监听 listener
containerProperties.setMessageListener(new KafkaBatchMessageListener(consumerInfo.getTopic()));
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
配置消费者管理
@Component
@Slf4j
public class KafkaConsumerManager {
/**
* 消费者集合 <consumerId, KafkaConsumerThread>
*/
private static final Map<String, KafkaMessageListenerContainer<String, String>> KAFKA_CONSUMER_THREAD_MAP = new HashMap<>();
/**
* 添加消费者
*
* @param consumerInfo 消费者信息
*/
public synchronized void addConsumer(KafkaConsumerInfoDto consumerInfo) {
if (ObjectUtils.isEmpty(consumerInfo)) {
return;
}
// 通过 消费者id停止线程
stopByConsumerId(consumerInfo.getConsumerId());
// 消费者id
String consumerId = consumerInfo.getConsumerId();
// 构建消费者配置信息
KafkaMessageListenerContainer<String, String> kafkaMessageListener = KafkaUtil.buildKafkaListenerContainerFactory(consumerInfo);
KAFKA_CONSUMER_THREAD_MAP.put(consumerId, kafkaMessageListener);
// 启动消费者监听
kafkaMessageListener.start();
log.info("创建消费者: {} 成功!", consumerId);
}
/**
* 停止消除
*
* @param consumerId 消费者id
*/
public void stopByConsumerId(String consumerId) {
if (StringUtils.isEmpty(consumerId)) {
return;
}
KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer = KAFKA_CONSUMER_THREAD_MAP.get(consumerId);
if (ObjectUtils.isEmpty(kafkaMessageListenerContainer)) {
return;
}
// 停止消费
kafkaMessageListenerContainer.stop();
KAFKA_CONSUMER_THREAD_MAP.remove(consumerId);
log.info("停止消费者: {} 成功!", consumerId);
}
}
消费者监听程序
/**
* kafka批量消费接口
*/
@Slf4j
public class KafkaBatchMessageListener implements BatchConsumerAwareMessageListener<String, String> {
/**
* 订阅主题
*/
private String topic;
public KafkaBatchMessageListener() { }
public KafkaBatchMessageListener(String topic) {
this.topic = topic;
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> list, Consumer<?, ?> consumer) {
KafkaMessageService kafkaMessageService = SpringUtil.getBean(KafkaMessageService.class);
List<String> messageList = list.stream().map(ConsumerRecord::value).collect(Collectors.toList());
kafkaMessageService.saveToFile(messageList, topic);
log.info("消费数量:{} 条..", list.size());
}
}
提交执行监听程序

执行异步线程

消费者记录拉取

回调 onMessage

版权声明:本文为qq_40278285原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。