springboot自动配置的参数有限,如果业务处理逻辑比较慢,需要自己写配置,增加最长拉取消息时间参数。
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
//构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
//是否自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交时,提交频率,poll时会判断距离上次时间大于这个频率才会提交偏移量,偏移量都是在poll方法内实现的。
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
//向服务器证明消费者可用,要在一个session周期内向服务器发送心跳
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
//发送心跳频率
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//每次poll拉取的最大记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
//每次poll拉取的最大时间间隔,根据业务处理逻辑的时间来定,记录数*每条记录的处理时间<时间间隔,否则还没有处理完就会发生重平衡,偏移量没有提交,引起重复消费。
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
/**
* 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
* @return
*/
@Bean("consumerFactory")
public DefaultKafkaConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory(consumerProperties());
}
@Bean("listenerContainerFactory")
//个性化定义消费者
public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
//指定使用DefaultKafkaConsumerFactory
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//设置消费者ack模式为手动,看需求设置
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/* @Bean
//代码创建方式topic
public NewTopic batchTopic() {
return new NewTopic("topic.quick.batch", 8, (short) 1);
}*/
//创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
private Map<String, Object> producerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//0时,producer不会等待确认;1时,等待leader写到local log就行;all或-1时,等待所有副本确认。
props.put(ProducerConfig.ACKS_CONFIG, "-1");
//批量发送的大小,设为0,则不会进行批处理。
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 500);
//下次发送的等待时间,如果不够批量大小,则等到了这个时间才会发送,默认为0,不等待,这个时候批量没有意义。
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
/**
* 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
* @return
*/
@Bean("produceFactory")
public DefaultKafkaProducerFactory produceFactory(){
return new DefaultKafkaProducerFactory(producerProperties());
}
/**
* 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
* @param produceFactory
* @return
*/
@Bean
public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){
return new KafkaTemplate(produceFactory);
}
}
版权声明:本文为sinat_37469877原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。