RabbitMQ设置过期时间TTL和死信队列(八)

一、概述

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

第一种方法是通过队列属性设置,这些属性可以在web界面找到如下截图:
在这里插入图片描述
第二种方法是对消息进行单独设置,主要参考一个类即可:
在这里插入图片描述

二、设置队列的TTL

2.1 配置类

TTLRabbitConfig:

package com.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class TTLRabbitConfig {

    //创建TTL队列
    @Bean
    public Queue directTTLQueue() {
        /*
         *  如果队列不存在,则会创建
         *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
         *
         *  @params1: queue 队列的名称
         *  @params2: durable 队列是否持久化
         *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
         *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
         *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
         * */
        Map<String,Object> args2 = new HashMap<>();
        args2.put("x-message-ttl",5000);
        return new Queue("ttl.direct.queue", true, false, false, args2);
    }

    //创建交换机
    @Bean
    public DirectExchange directTTLOrderExchange() {
        return new DirectExchange("ttl_order_exchange", true, false);
    }


    //绑定关系
    @Bean
    public Binding directTTLBinding() {
        return BindingBuilder.bind(directTTLQueue()).to(directTTLOrderExchange()).with("ttl");
    }

}

2.2 订单生产者

    public void makeOrderTTL(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "ttl_order_exchange";
        String routingKey = "ttl";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer);
    }

2.3 测试类

    @Test
    public void contextLoads4() throws Exception {
        orderService.makeOrderTTL("1", "1", 12);
    }

2.4 测试结果

在这里插入图片描述
成功创建了TTL队列,消息在五秒后自动移除。

三、设置消息的TTL

3.1 配置类

package com.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class TTLRabbitConfig {

     //普通队列
    @Bean
    public Queue directTTLMessageQueue() {
        return new Queue("ttlMessage.direct.queue", true, false, false);
    }

    //创建交换机
    @Bean
    public DirectExchange directTTLOrderExchange() {
        return new DirectExchange("ttl_order_exchange", true, false);
    }

    //绑定关系
    @Bean
    public Binding directTTLMessageBinding() {
        return BindingBuilder.bind(directTTLMessageQueue()).to(directTTLOrderExchange()).with("ttlMessage");
    }
}

3.2 订单生产者

 public void makeOrderTTLMessage(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "ttl_order_exchange";
        String routingKey = "ttlMessage";

        /*
            给消息设置过期时间
            注意:
                RabbitMQ只会对队列头部的消息进行过期淘汰。
                如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。
                会造成后入消息不会及时地过期淘汰,导致消息的堆积。

                与过期队列不同,设置了消息TTL,一旦过期就直接被移除,不能投递到死信队列里。
         */
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer,messagePostProcessor);


//        //设置过期时间
//        MessageProperties messageProperties = new MessageProperties();
//        messageProperties.setExpiration("10000");
//
//        //这个参数是用来做消息的唯一标识
//        //发布消息时使用,存储在消息的headers中
//        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        Message message = new Message(orderNumer.getBytes(StandardCharsets.UTF_8), messageProperties);
//        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }

注:消息的属性核心类是MessageProperties 。

3.3 测试类

  @Test
    public void contextLoads5() throws Exception {
        orderService.makeOrderTTLMessage("1", "1", 12);
    }

3.4 测试结果

队列成功创建,消息在5秒后,也自动清除。
需要注意的是,消息的TTL,在时间到期后,消息会被移除,且无法转移到死信队列。(后面说到)
注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。
如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。
会造成后入消息不会及时地过期淘汰,导致消息的堆积。

四、死信队列

4.1 配置类

package com.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 创建死信队列,其实也是流程也是一样的,与之前创建的没啥区别
 * 主要是用途不一样,我们会将我们的TTL队列绑定到这个死信队列,就能实现消息的转移
 *
 **/
@Configuration
public class DeadRabbitConfig {

    //用于接盘消息的交换机和队列
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("dead_order_exchange", true, false);
    }
    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }

    //创建一个TTL队列,并实现过期消息转移到死信队列里
    @Bean
    public DirectExchange ttlExchange() {
        return new DirectExchange("ttl_Dead_order_exchange", true, false);
    }
    @Bean
    public Queue ttlQueue() {
        Map<String,Object> args2 = new HashMap<>();
        //这些参数主要是从web界面上找的
        args2.put("x-message-ttl",5000);//指定过期时间
        args2.put("x-dead-letter-exchange","dead_order_exchange");//指定要转移的交换机
        args2.put("x-dead-letter-routing-key","dead");//指定路由key,根据模式判断是否要传
        args2.put("x-max-length",5);//指定队列容量
        return new Queue("ttl.dead.direct.queue", true, false, false, args2);
    }
    @Bean
    public Binding ttlBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttlDead");
    }
}

由代码可以看到,其实死信队列就只是一个普通队列,专门用于接收其他队列消息的转移,因此才称作死信队列。
这里演示了,消息过期和队列容量满了的情况实现消息转移到死信队列。

4.2 订单生产者

    public void makeOrderTtlDead(String userId, String productId, int num) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();

        // 2: 下单完成以后
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);

        // 发送订单信息给RabbitMQ
        String exchangeName = "ttl_Dead_order_exchange";
        String routingKey = "ttlDead";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer);
    }

4.3 测试类

   @Test
    public void contextLoads6() throws Exception {
       for (int i=0;i<11;i++){
           orderService.makeOrderTtlDead("1", "1", 12);
       }
    }

4.4 运行结果

发了11条消息到TTL队列,有6条转移到了死信队列,自己留下了5条;再过五秒后,这5条也转移到了死信队列里。


版权声明:本文为weixin_41979002原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。