中间件系列二 RabbitMQ之消息持久性、确认机制、拒绝、预取数量、分配策略

1. 概述

本文主要对RabbitMQ消息的特性进行总结,主要包括如下内容:

  • 消息的持久性
  • 消息确认机制
  • 两种方法拒绝消息
  • 设置预取消息的数量
  • 消息分配策略

2. 消息的持久性

为了保证消息的可靠性,需要对消息进行持久化。
为了保证RabbitMQ在重启、奔溃等异常情况下数据没有丢失,除了对消息本身持久化为,还需要将消息传输经过的队列(queue),交互机进行持久化(exchange),持久化以上元素后,消息才算真正RabbitMQ重启不会丢失。

消息持久化
方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;   

第三个参数props:设置投递模式为持久化,如果此值是persistent ,则此消息存储在磁盘上。如果服务器重启,系统会保证收到的持久化消息未丢失,将消息以持久化方式发布时,会对性能造成一定的影响

消息持久化代码如下:

channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));  

队列持久化

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

详细参数如下:

第二个参数 durable
是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
第三个参数 execulusive
表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
第四个参数 autoDelete
当没有生成者/消费者使用此队列时,此队列会被自动删除。
(即当最后一个消费者退订后即被删除)

代码如下:

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

交换机持久化

以下是声明交换机的方法:

 Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;   

方法参数说明:

第三个参数durable:交换机是否持久化

关于交换机的内容,我们后缀说明,这是只要知道有这个就行

3. 消息确认机制

消费者在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题,对于此AMQP有两种处理方式:

  • ○ 自动确认模式(automatic acknowledgement model):当RabbbitMQ将消息发送给应用后,消费者端自动回送一个确认消息,此时RabbitMQ删除此消息。
  • ○ 显式确认模式(explicit acknowledgement model):消费者收到消息后,可以在执行一些逻辑后,消费者自己决定什么时候发送确认回执(acknowledgement),RabbitMQ收到回执后才删除消息,这样就保证消费端不会丢失消息

如果一个消费者在尚未发送确认回执的情况下挂掉了,那么消息会被重新放入队列,并且在还有其他消费者存在于此队列的前提下,立即投递给另外一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
RabbitMQ里的消息是不会过期。当消费者挂掉后,RabbitMQ会不断尝试重推。所有单个消息的推送可能花费很长的时间

是否开启自动确认模式由以下方法的autoAck属性决定

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

代码实现

  • a 自动确认模式:只需要设置此值为autoAck为true即可,可以参考上一篇文章的用法
  • b. 显示确认模式:见WorkQueuesRecv.java

WorkQueuesRecv有两个注意点:
a. channel.basicConsume()第二个参数autoAck值为false
b. 收到消息后,必须调用 channel.basicAck 向rabbitMQ发送确认回执

// 默认消费者实现
final Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [WorkQueuesRecv-" +id+ "] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [WorkQueuesRecv-" +id+ "] Done");
            // 情况一:对处理好的消息进行应答
             channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
};
// 获取消息:
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

消息确认方法:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

方法详细参数如下:

第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,(任何channel上发布的第一条消息的deliveryTag为1,此后的每一条消息都会加1),deliveryTag在channel范围内是唯一的
第二个参数multiple:批量确认标志。如果值为true,则执行批量确认,此deliveryTag之前收到的消息全部进行确认; 如果值为false,则只对当前收到的消息进行确认

备注
如果在获取消息时采用不自动应答,但是获取消息后不调用basicAck,则后果会很严重。RabbitMQ会认为消息没有投递成功,不仅所有的消息都会保留到内存中,而且在客户重新连接后,会将所有的消息重新投递一遍

4. 拒绝消息

当消费者接收到某条消息后,处理过程有可能失败,这时消费者可以拒绝此消息。在拒绝消息时,消费者会告诉RabbitMQ如何处理这条消息:销毁它或者重新放入队列。
可以有两种方式拒绝此消息

a. channel.basicReject:只支持对一条消息进行拒绝
拒绝方法:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

方法详细参数如下:

第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
第二个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息

代码台下:

channel.basicReject(envelope.getDeliveryTag(), true);

b. channel.basicNack
channel.basicNack是 channel.basicReject的补充,提供一次对多条消息进行拒绝的功能
方法如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; 

方法参数详细如下:

第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息

代码如下

channel.basicNack(envelope.getDeliveryTag(), false, false);

备注
当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

5. 设置预取消息的数量

默认情况下,RabbitMQ收到消息后,就向消费者送。但是如果消息过多,且消息的数量超过了消息者处理能力从而导致其崩溃。此时我们可以通过prefetchCount 限制每个消费者在收到下一个确认回执前一次可以最大接受多少条消息。即如果设置prefetchCount =1,RabbitMQ向这个消费者发送一个消息后,再这个消息的消费者对这个消息进行ack之前,RabbitMQ不会向这个消费者发送新的消息

代码如下

 // 每个客户端每次最后获取N个消息
channel.basicQos(1);

6. 消息分配策略

这里写图片描述
多个消费者同时消费同一个队列,Rabbit的消息的分配策略是什么?
如果同一个队列,有多个消费者消费这个队列。RabbitMQ默认是按照轮询的策略发送消息,即发送的顺序是消费者1,消费者2,消费者1,消费者2…。所以平均下来,每个消费者消费的消息数量几乎相同。

7. 代码

上文的详细代码主要如下:
发送者代码: WorkQueuesRecv.java
消费者代码:WorkQueuesSend.java
测试代码:BasicTest.java的方法 workqueues()
所有的详细代码见github代码,请尽量使用tag v0.7,不要使用master,因为master一直在变,不能保证文章中代码和github上的代码一直相同


版权声明:本文为hry2015原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。