微服务之RabbitMQ消息发送幂等性问题(下)

大家设想下面一个场景:

消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因 导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立 起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致我们在消费消息时,一定要处理好幂等性问题。

幂等性处理也不是很难,基本上都是从业务逻辑上面处理,我来讲述一下大致的思路。

首先我们采用Redis,在消费者消费消息之前,先将消息的id放到Redis中,储存方式如下:

keyvaluestatus
id0正在执行业务
id1业务执行成功

如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在,则证明此前有人消费过该消息,获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack 即可。

代码如下:

首先需要打开redis并且可以ping通:

 好了,redis打开之后,在我们源码上面指定一下我们的redis配置:

修改 rabbitmq 消费方式为手动提交,默认为自动提交。添加该配置的作用是我们可以在代码上面手动 ack 确认消息成功消费。

最后我们来看看邮件发送的逻辑代码:

@Component
public class MailConsumer {

    @Autowired
    StringRedisTemplate redisTemplate;

    @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
    public void handleMail(Channel channel, Message message) {
        //获取用户发送的 mail 对象
        Mail mail = (Mail) message.getPayload();
        //获取消息的标记
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //获取消息 id
        String id = (String) message.getHeaders().get("spring_returned_message_correlation");
        ValueOperations<String, String> ops = redisTemplate.opsForValue();
        try {
            //setnx
            String setnx = ops.get(id);
            if ("0".equals(setnx)) {
                //说明有人正在消费
                return;
            } else if ("1".equals(setnx) {
                //说明消息消费成功了
                //两种可能性:1. 之前的消息其实已经消费成功了,但是没有确认而已,所以这里只需要 ack 即可
                //2. 之前的消息已经消费成功了,当前收到的其实是一条重复的消息,重复的消息也不用重复消费了,直接 ack 即可
                channel.basicAck(deliveryTag, false);
                return;
            }

            //表示这个消息正在消费
            ops.set(id, "0");
//            int i = 1 / 0;
            System.out.println("mail.getTo() = " + mail.getTo());
            System.out.println("mail.getMailContent() = " + mail.getMailContent());
            //手动告诉mq,消息已经消费成功
            //1. 参数的标记
            //2. 是否是批量处理
            //表示这个消息已经消费成功
            ops.set(id, "1");
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            e.printStackTrace();
            //手动告诉 mq,消息消费失败
            //第三个参数表示这个失败的消息是否重新回到消息队列中
            try {
                channel.basicNack(deliveryTag, false, true);
                //拒绝消息
//                channel.basicReject(deliveryTag, true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

测试结果与微服务之RabbitMQ消息发送失败重试(上)无异,但是解决了邮件重复发送的问题。


版权声明:本文为Jumpingyyds原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。