1、准备工作:
需要在rabbitMq中安装插件:rabbitmq_delayed_message_exchange插件并启动,下载地址
https://www.rabbitmq.com/community-plugins.html

在web页面查看插件是否安装成功:看创建exchanges时type是否有x-delayed-message类型

2、代码:
配置文件
rabbitmq:
addresses: 服务器地址
username: 账号名
password: 登陆密码
virtual-host: /
publisher-returns: true
publisher-confirms: true
connection-timeout: 5000
listener:
simple:
acknowledge-mode: manualRabbigMqConfig:
package com.dayunmotor.tsp.message.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ的配置对象。
*
*
*/
@Configuration
@Slf4j
public class RabbitMQConfig {
/**
* 定制AmqpTemplate对象。
* 可根据需要定制多个。
*
* @return AmqpTemplate对象。
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置消息转换器为Jackson
rabbitTemplate.setEncoding("UTF-8");
// 设置不接受不可路由的消息,需要在yml中配置:publisher-returns: true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId();
log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},原因:{},交换器: {},路由键:{}",
correlationId,
replyCode,
replyText,
exchange,
routingKey);
});
// 设置消息发布确认功能,需要在yml中配置:publisher-confirms: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("ConfirmCallback -> 消息发布到交换器成功,id:{}", correlationData);
} else {
log.warn("ConfirmCallback -> 消息发布到交换器失败,错误原因为:{}", cause);
}
});
// 设置json格式解析器
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
/**
* @return 消息转换器。
*/
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 创建exchanges
* @return
*/
@Bean
CustomExchange orderPluginDirect() {
//创建一个自定义交换机,可以发送延迟消息
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("message-delay-test", "x-delayed-message",true, false,args);
}
/**
* 延迟队列
*/
@Bean
public Queue orderPluginQueue() {
return new Queue("message-delay-test-queue");
}
/**
* 将延迟队列绑定到交换机
*/
@Bean
public Binding orderPluginBinding(CustomExchange orderPluginDirect, Queue orderPluginQueue) {
return BindingBuilder
.bind(orderPluginQueue)
.to(orderPluginDirect)
.with("message-delay-test-routingkey")
.noargs();
}
}
其中创建exchange和队列也可以通过web页面进行创建和绑定
发送端代码:
public void timingPushMessage(MessageTask objToObj) {
objToObj.setTitle("delay1");
rabbitTemplate.convertAndSend("message-delay-test",
"message-delay-test-routingkey", objToObj, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setHeader("x-delay",15000);
return message;
}
});
log.info("delay发送成功" + new Date());
MessageProperties messageProperties3 = new MessageProperties();
objToObj.setTitle("delay3");
rabbitTemplate.convertAndSend("message-delay-test",
"message-delay-test-routingkey", objToObj, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setHeader("x-delay",50000);
return message;
}
});
log.info("delay3发送成功" + new Date());
objToObj.setTitle("delay2");
rabbitTemplate.convertAndSend("message-delay-test",
"message-delay-test-routingkey", objToObj, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setHeader("x-delay",10000);
return message;
}
});
log.info("delay2发送成功" + new Date());
}监听端代码:
@Component
@Slf4j
public class MessageTaskTimingPushMessageListener {
@RabbitListener(queues = "message-delay-test-queue")
@RabbitHandler
public void process(MessageTask messageTask,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
log.info("消费消息成功: {}", messageTask);
channel.basicAck(tag, true);
}
}
测试结果:

到这里已经可以实现定时的功能了。
最后说一句:此插件实现的延迟队列功能和通过死信队列实现差不多,但是使用死信队列进行实现会导致消息堆积,私信队列原理:一个exchanges绑定两个队列,queue1设置过期后由queue2绑定的routingkey发送到exchanges,然后在监听queue2就可以了,queue1配置
exchanges绑定:

这种实现方式因为queue1中的信息每个都设置过期时间,但是队列从头部开始,导致第一个信息设置1小时,第二个设置30分钟,只有等1小时过后才会轮到第二个,但是第二个已经过期,只能堆积到队列中,所以还是需要插件看来实现
版权声明:本文为lizhongde112原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。