文章预览:
Consumer Ack(消费者确认机制)
如何保证消息被消费者成功消费?
生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。
这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。
一般而言,我们有如下处理手段:
采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险。
简而言之:只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
简而言之: 看情况确认,如果此时消费者抛出异常则消息会返回到队列中采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回Ack
本小节的内容总结起来就如图所示,本质上就是“请求/应答”确认模式
springBoot完整案列
1. pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.4</version>
</dependency>
2.yml

spring:
application:
name: springboot_consumer_ack
rabbitmq:
host: 139.9.132.132 #主机
virtual-host: /
username: admin #用户名
password: 123 #密码
port: 5672 #端口
listener:
type: simple
simple:
max-concurrency: 10
concurrency: 5
prefetch: 10
#消费者ack模式
#NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
#AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
#MANUAL模式,需要显式的调用当前channel的basicAck方法
acknowledge-mode: manual
retry:
enabled: true #是否开启消费者重试(为false时关闭消费者重试, 意思不是“不重试”,而是一直收到消息直到jack确认或者一直到超时)
initial-interval: 5000 #重试间隔时间(单位毫秒)
max-attempts: 5 #最大重试次数
default-requeue-rejected: false #重试超过最大次数后是否拒绝
3.主入口类
@SpringBootApplication
public class RabbitmqApplication {
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
@Bean
public ApplicationRunner runner() {
return args -> {
Thread.sleep(5000);
for (int i = 0; i < 10; i++) {
MessageProperties props = new MessageProperties();
props.setDeliveryTag(i);
Message message = new Message(("消息:" + i).getBytes("utf-8"), props);
// this.rabbitTemplate.convertAndSend("ex.biz", "biz", "消息:" + i);
this.rabbitTemplate.convertAndSend("ex.biz", "biz", message);
}
};
}
}
4.rabbitConfig
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 声明交换机
*/
@Bean
public Exchange exchange() {
//参数一:交换机名称 参数二:是否持久化 true--交换机持久化,mq重启后交换机会恢复
// 参数三:是否自动删除,true---使用后自动删除 参数四:参数
return new TopicExchange("ex.biz", false, false, null);
}
/**
* 声明队列
*/
@Bean
public Queue queue() {
//参数一:队列名称 参数二:是否持久化 true--队列持久化,mq重启后队列会恢复
// 参数三:是否自动删除,true---使用后自动删除 参数四:参数
return new Queue("q.biz", false, false, false, null);
}
/**
* 交换机和队列绑定
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("biz").noargs();
}
}
5.MessageListener
5.1
5.2 @RabbitListener(queues = “q.biz”, ackMode = “MANUAL”)
@Component
public class MessageListener {
private Random random = new Random();
/**
* * NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* * AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* * MANUAL模式,需要显式的调用当前channel的basicAck方法
* * @param channel
* * @param deliveryTag
* * @param message
*
*/
// @RabbitListener(queues = "q.biz", ackMode = "AUTO")
@RabbitListener(queues = "q.biz", ackMode = "MANUAL")
// @RabbitListener(queues = "q.biz", ackMode = "NONE")
public void handleMessageTopic(Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
@Payload String message) {
System.out.println("RabbitListener消费消息,消息内容:" + message);
try {
if (random.nextInt(10) % 3 != 0) {
// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列
// channel.basicNack(deliveryTag, false, true);
//手动拒绝消息。第二个参数表示是否重新入列
channel.basicReject(deliveryTag, true);
} else {
// 手动ack,deliveryTag表示消息的唯一标志,multiple表示是否是批量确认
channel.basicAck(deliveryTag, false);
System.err.println("已确认消息:" + message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
启动RabbitmqApplication 结果为
RabbitListener消费消息,消息内容:消息:3
RabbitListener消费消息,消息内容:消息:1
RabbitListener消费消息,消息内容:消息:2
RabbitListener消费消息,消息内容:消息:4
RabbitListener消费消息,消息内容:消息:0
RabbitListener消费消息,消息内容:消息:5
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:9
RabbitListener消费消息,消息内容:消息:6
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:0
已确认消息:消息:9
已确认消息:消息:6
已确认消息:消息:5
RabbitListener消费消息,消息内容:消息:3
RabbitListener消费消息,消息内容:消息:4
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:1
RabbitListener消费消息,消息内容:消息:2
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:4
RabbitListener消费消息,消息内容:消息:3
RabbitListener消费消息,消息内容:消息:1
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:2
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:3
已确认消息:消息:1
已确认消息:消息:2
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:7
RabbitListener消费消息,消息内容:消息:8
已确认消息:消息:8
5.3
版权声明:本文为weixin_43811057原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。