rabbitmq消息可靠性--------Consumer Ack(消费者确认机制)

Consumer Ack(消费者确认机制)

如何保证消息被消费者成功消费?

生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。

RabbitMQ在消费端会有Ack机制即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)
这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。
一般而言,我们有如下处理手段:

  1. 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险。
    简而言之只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
  2. 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
    简而言之: 看情况确认,如果此时消费者抛出异常则消息会返回到队列中
  3. 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回Ack

本小节的内容总结起来就如图所示,本质上就是“请求/应答”确认模式在这里插入图片描述

springBoot完整案列

1. pom.xml

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.6.4</version>
        </dependency>

2.yml

在这里插入图片描述

spring:
  application:
    name: springboot_consumer_ack
  rabbitmq:
    host: 139.9.132.132 #主机
    virtual-host: /
    username: admin     #用户名
    password: 123       #密码
    port: 5672          #端口
    listener:
      type: simple
      simple:
        max-concurrency: 10
        concurrency: 5
        prefetch: 10
        #消费者ack模式
        #NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
        #AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
        #MANUAL模式,需要显式的调用当前channel的basicAck方法
        acknowledge-mode: manual
        retry:
          enabled: true   #是否开启消费者重试(为false时关闭消费者重试, 意思不是“不重试”,而是一直收到消息直到jack确认或者一直到超时)
          initial-interval: 5000   #重试间隔时间(单位毫秒)
          max-attempts: 5 #最大重试次数
        default-requeue-rejected: false  #重试超过最大次数后是否拒绝

3.主入口类

@SpringBootApplication
public class RabbitmqApplication {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Thread.sleep(5000);
            for (int i = 0; i < 10; i++) {
                MessageProperties props = new MessageProperties();
                props.setDeliveryTag(i);
                Message message = new Message(("消息:" + i).getBytes("utf-8"), props);
//                this.rabbitTemplate.convertAndSend("ex.biz", "biz", "消息:" + i);
                this.rabbitTemplate.convertAndSend("ex.biz", "biz", message);
            }
        };
    }
}

4.rabbitConfig


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    /**
     * 声明交换机
     */
    @Bean
    public Exchange exchange() {
        //参数一:交换机名称 参数二:是否持久化 true--交换机持久化,mq重启后交换机会恢复
        // 参数三:是否自动删除,true---使用后自动删除 参数四:参数
        return new TopicExchange("ex.biz", false, false, null);
    }

    /**
     * 声明队列
     */
    @Bean
    public Queue queue() {
        //参数一:队列名称 参数二:是否持久化 true--队列持久化,mq重启后队列会恢复
        // 参数三:是否自动删除,true---使用后自动删除 参数四:参数
        return new Queue("q.biz", false, false, false, null);
    }

    /**
     * 交换机和队列绑定
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("biz").noargs();
    }
}

5.MessageListener

5.1

5.2 @RabbitListener(queues = “q.biz”, ackMode = “MANUAL”)

@Component
public class MessageListener {
    private Random random = new Random();

    /**
     *     * NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
     *     * AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
     *     * MANUAL模式,需要显式的调用当前channel的basicAck方法
     *     * @param channel
     *     * @param deliveryTag
     *     * @param message
     *    
     */
//    @RabbitListener(queues = "q.biz", ackMode = "AUTO")
    @RabbitListener(queues = "q.biz", ackMode = "MANUAL")
//    @RabbitListener(queues = "q.biz", ackMode = "NONE")
    public void handleMessageTopic(Channel channel,
                                   @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                                   @Payload String message) {
        System.out.println("RabbitListener消费消息,消息内容:" + message);
        try {
            if (random.nextInt(10) % 3 != 0) {
                // 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列
//                 channel.basicNack(deliveryTag, false, true);
                //手动拒绝消息。第二个参数表示是否重新入列
                channel.basicReject(deliveryTag, true);
            } else {
                // 手动ack,deliveryTag表示消息的唯一标志,multiple表示是否是批量确认
                channel.basicAck(deliveryTag, false);
                System.err.println("已确认消息:" + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

启动RabbitmqApplication 结果为

RabbitListener消费消息,消息内容:消息:3
RabbitListener消费消息,消息内容:消息:1
RabbitListener消费消息,消息内容:消息:2
RabbitListener消费消息,消息内容:消息:4
RabbitListener消费消息,消息内容:消息:0
RabbitListener消费消息,消息内容:消息:5
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:9
RabbitListener消费消息,消息内容:消息:6
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:0
已确认消息:消息:9
已确认消息:消息:6
已确认消息:消息:5
RabbitListener消费消息,消息内容:消息:3
RabbitListener消费消息,消息内容:消息:4
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:1
RabbitListener消费消息,消息内容:消息:2
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:4
RabbitListener消费消息,消息内容:消息:3
RabbitListener消费消息,消息内容:消息:1
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:2
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:3
已确认消息:消息:1
已确认消息:消息:2
RabbitListener消费消息,消息内容:消息:8
RabbitListener消费消息,消息内容:消息:7
已确认消息:消息:7
RabbitListener消费消息,消息内容:消息:8
已确认消息:消息:8

5.3


版权声明:本文为weixin_43811057原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。