一、概述
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
第一种方法是通过队列属性设置,这些属性可以在web界面找到如下截图:
第二种方法是对消息进行单独设置,主要参考一个类即可:
二、设置队列的TTL
2.1 配置类
TTLRabbitConfig:
package com.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class TTLRabbitConfig {
//创建TTL队列
@Bean
public Queue directTTLQueue() {
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
Map<String,Object> args2 = new HashMap<>();
args2.put("x-message-ttl",5000);
return new Queue("ttl.direct.queue", true, false, false, args2);
}
//创建交换机
@Bean
public DirectExchange directTTLOrderExchange() {
return new DirectExchange("ttl_order_exchange", true, false);
}
//绑定关系
@Bean
public Binding directTTLBinding() {
return BindingBuilder.bind(directTTLQueue()).to(directTTLOrderExchange()).with("ttl");
}
}
2.2 订单生产者
public void makeOrderTTL(String userId, String productId, int num) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
// 2: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ
String exchangeName = "ttl_order_exchange";
String routingKey = "ttl";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer);
}
2.3 测试类
@Test
public void contextLoads4() throws Exception {
orderService.makeOrderTTL("1", "1", 12);
}
2.4 测试结果
成功创建了TTL队列,消息在五秒后自动移除。
三、设置消息的TTL
3.1 配置类
package com.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class TTLRabbitConfig {
//普通队列
@Bean
public Queue directTTLMessageQueue() {
return new Queue("ttlMessage.direct.queue", true, false, false);
}
//创建交换机
@Bean
public DirectExchange directTTLOrderExchange() {
return new DirectExchange("ttl_order_exchange", true, false);
}
//绑定关系
@Bean
public Binding directTTLMessageBinding() {
return BindingBuilder.bind(directTTLMessageQueue()).to(directTTLOrderExchange()).with("ttlMessage");
}
}
3.2 订单生产者
public void makeOrderTTLMessage(String userId, String productId, int num) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
// 2: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ
String exchangeName = "ttl_order_exchange";
String routingKey = "ttlMessage";
/*
给消息设置过期时间
注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。
如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。
会造成后入消息不会及时地过期淘汰,导致消息的堆积。
与过期队列不同,设置了消息TTL,一旦过期就直接被移除,不能投递到死信队列里。
*/
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer,messagePostProcessor);
// //设置过期时间
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setExpiration("10000");
//
// //这个参数是用来做消息的唯一标识
// //发布消息时使用,存储在消息的headers中
// CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// Message message = new Message(orderNumer.getBytes(StandardCharsets.UTF_8), messageProperties);
// rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}
注:消息的属性核心类是MessageProperties 。
3.3 测试类
@Test
public void contextLoads5() throws Exception {
orderService.makeOrderTTLMessage("1", "1", 12);
}
3.4 测试结果
队列成功创建,消息在5秒后,也自动清除。
需要注意的是,消息的TTL,在时间到期后,消息会被移除,且无法转移到死信队列。(后面说到)
注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。
如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。
会造成后入消息不会及时地过期淘汰,导致消息的堆积。
四、死信队列
4.1 配置类
package com.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 创建死信队列,其实也是流程也是一样的,与之前创建的没啥区别
* 主要是用途不一样,我们会将我们的TTL队列绑定到这个死信队列,就能实现消息的转移
*
**/
@Configuration
public class DeadRabbitConfig {
//用于接盘消息的交换机和队列
@Bean
public DirectExchange deadExchange() {
return new DirectExchange("dead_order_exchange", true, false);
}
@Bean
public Queue deadQueue() {
return new Queue("dead.direct.queue", true);
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
//创建一个TTL队列,并实现过期消息转移到死信队列里
@Bean
public DirectExchange ttlExchange() {
return new DirectExchange("ttl_Dead_order_exchange", true, false);
}
@Bean
public Queue ttlQueue() {
Map<String,Object> args2 = new HashMap<>();
//这些参数主要是从web界面上找的
args2.put("x-message-ttl",5000);//指定过期时间
args2.put("x-dead-letter-exchange","dead_order_exchange");//指定要转移的交换机
args2.put("x-dead-letter-routing-key","dead");//指定路由key,根据模式判断是否要传
args2.put("x-max-length",5);//指定队列容量
return new Queue("ttl.dead.direct.queue", true, false, false, args2);
}
@Bean
public Binding ttlBinding() {
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttlDead");
}
}
由代码可以看到,其实死信队列就只是一个普通队列,专门用于接收其他队列消息的转移,因此才称作死信队列。
这里演示了,消息过期和队列容量满了的情况实现消息转移到死信队列。
4.2 订单生产者
public void makeOrderTtlDead(String userId, String productId, int num) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
// 2: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ
String exchangeName = "ttl_Dead_order_exchange";
String routingKey = "ttlDead";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer);
}
4.3 测试类
@Test
public void contextLoads6() throws Exception {
for (int i=0;i<11;i++){
orderService.makeOrderTtlDead("1", "1", 12);
}
}
4.4 运行结果
发了11条消息到TTL队列,有6条转移到了死信队列,自己留下了5条;再过五秒后,这5条也转移到了死信队列里。
版权声明:本文为weixin_41979002原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。