rabbitMq延迟队列实现定时任务

1、准备工作:

需要在rabbitMq中安装插件:rabbitmq_delayed_message_exchange插件并启动,下载地址

https://www.rabbitmq.com/community-plugins.html

在web页面查看插件是否安装成功:看创建exchanges时type是否有x-delayed-message类型

2、代码:

配置文件

rabbitmq:
    addresses: 服务器地址
    username: 账号名
    password: 登陆密码
    virtual-host: /
    publisher-returns: true
    publisher-confirms: true
    connection-timeout: 5000
    listener:
      simple:
        acknowledge-mode: manual

RabbigMqConfig:

package com.dayunmotor.tsp.message.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


/**
 * RabbitMQ的配置对象。
 *
 * 
 */
@Configuration
@Slf4j
public class RabbitMQConfig {

    /**
     * 定制AmqpTemplate对象。
     * 可根据需要定制多个。
     *
     * @return AmqpTemplate对象。
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消息转换器为Jackson
        rabbitTemplate.setEncoding("UTF-8");
        // 设置不接受不可路由的消息,需要在yml中配置:publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},原因:{},交换器: {},路由键:{}",
                    correlationId,
                    replyCode,
                    replyText,
                    exchange,
                    routingKey);
        });
        // 设置消息发布确认功能,需要在yml中配置:publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("ConfirmCallback -> 消息发布到交换器成功,id:{}", correlationData);
            } else {
                log.warn("ConfirmCallback -> 消息发布到交换器失败,错误原因为:{}", cause);
            }
        });
        // 设置json格式解析器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


    /**
     * @return 消息转换器。
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }


    /**
     * 创建exchanges
     * @return
     */
    @Bean
    CustomExchange orderPluginDirect() {
        //创建一个自定义交换机,可以发送延迟消息
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("message-delay-test", "x-delayed-message",true, false,args);
    }

    /**
     * 延迟队列
     */
    @Bean
    public Queue orderPluginQueue() {
        return new Queue("message-delay-test-queue");
    }

    /**
     * 将延迟队列绑定到交换机
     */
    @Bean
    public Binding orderPluginBinding(CustomExchange orderPluginDirect, Queue orderPluginQueue) {
        return BindingBuilder
                .bind(orderPluginQueue)
                .to(orderPluginDirect)
                .with("message-delay-test-routingkey")
                .noargs();
    }



}

其中创建exchange和队列也可以通过web页面进行创建和绑定

发送端代码:

public void timingPushMessage(MessageTask objToObj) {




        objToObj.setTitle("delay1");
        rabbitTemplate.convertAndSend("message-delay-test",
                "message-delay-test-routingkey", objToObj, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //给消息设置延迟毫秒值
                        message.getMessageProperties().setHeader("x-delay",15000);
                        return message;
                    }
                });
        log.info("delay发送成功" + new Date());
        MessageProperties messageProperties3 = new MessageProperties();


        objToObj.setTitle("delay3");
        rabbitTemplate.convertAndSend("message-delay-test",
                "message-delay-test-routingkey", objToObj, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //给消息设置延迟毫秒值
                        message.getMessageProperties().setHeader("x-delay",50000);
                        return message;
                    }
                });
        log.info("delay3发送成功" + new Date());


        objToObj.setTitle("delay2");
        rabbitTemplate.convertAndSend("message-delay-test",
                "message-delay-test-routingkey", objToObj, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //给消息设置延迟毫秒值
                        message.getMessageProperties().setHeader("x-delay",10000);
                        return message;
                    }
                });
        log.info("delay2发送成功" + new Date());
    }

监听端代码:

@Component
@Slf4j

public class MessageTaskTimingPushMessageListener {

    @RabbitListener(queues = "message-delay-test-queue")
    @RabbitHandler
    public void process(MessageTask messageTask,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("消费消息成功: {}", messageTask);
        channel.basicAck(tag, true);

    }


}

测试结果:

到这里已经可以实现定时的功能了。

最后说一句:此插件实现的延迟队列功能和通过死信队列实现差不多,但是使用死信队列进行实现会导致消息堆积,私信队列原理:一个exchanges绑定两个队列,queue1设置过期后由queue2绑定的routingkey发送到exchanges,然后在监听queue2就可以了,queue1配置 

 exchanges绑定:

 这种实现方式因为queue1中的信息每个都设置过期时间,但是队列从头部开始,导致第一个信息设置1小时,第二个设置30分钟,只有等1小时过后才会轮到第二个,但是第二个已经过期,只能堆积到队列中,所以还是需要插件看来实现


版权声明:本文为lizhongde112原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。