大家设想下面一个场景:
消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因 导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立 起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致我们在消费消息时,一定要处理好幂等性问题。
幂等性处理也不是很难,基本上都是从业务逻辑上面处理,我来讲述一下大致的思路。
首先我们采用Redis,在消费者消费消息之前,先将消息的id放到Redis中,储存方式如下:
| key | value | status |
| id | 0 | 正在执行业务 |
| id | 1 | 业务执行成功 |
如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在,则证明此前有人消费过该消息,获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack 即可。
代码如下:
首先需要打开redis并且可以ping通:

好了,redis打开之后,在我们源码上面指定一下我们的redis配置:

修改 rabbitmq 消费方式为手动提交,默认为自动提交。添加该配置的作用是我们可以在代码上面手动 ack 确认消息成功消费。
最后我们来看看邮件发送的逻辑代码:
@Component
public class MailConsumer {
@Autowired
StringRedisTemplate redisTemplate;
@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
public void handleMail(Channel channel, Message message) {
//获取用户发送的 mail 对象
Mail mail = (Mail) message.getPayload();
//获取消息的标记
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//获取消息 id
String id = (String) message.getHeaders().get("spring_returned_message_correlation");
ValueOperations<String, String> ops = redisTemplate.opsForValue();
try {
//setnx
String setnx = ops.get(id);
if ("0".equals(setnx)) {
//说明有人正在消费
return;
} else if ("1".equals(setnx) {
//说明消息消费成功了
//两种可能性:1. 之前的消息其实已经消费成功了,但是没有确认而已,所以这里只需要 ack 即可
//2. 之前的消息已经消费成功了,当前收到的其实是一条重复的消息,重复的消息也不用重复消费了,直接 ack 即可
channel.basicAck(deliveryTag, false);
return;
}
//表示这个消息正在消费
ops.set(id, "0");
// int i = 1 / 0;
System.out.println("mail.getTo() = " + mail.getTo());
System.out.println("mail.getMailContent() = " + mail.getMailContent());
//手动告诉mq,消息已经消费成功
//1. 参数的标记
//2. 是否是批量处理
//表示这个消息已经消费成功
ops.set(id, "1");
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
e.printStackTrace();
//手动告诉 mq,消息消费失败
//第三个参数表示这个失败的消息是否重新回到消息队列中
try {
channel.basicNack(deliveryTag, false, true);
//拒绝消息
// channel.basicReject(deliveryTag, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
测试结果与微服务之RabbitMQ消息发送失败重试(上)无异,但是解决了邮件重复发送的问题。
版权声明:本文为Jumpingyyds原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。