Spring boot RabbitMq 延迟消息

  1. rabbitMq配置文件
    spring:
      redis:
        host: 127.0.0.1
        port: 6377
      rabbitmq:
        host: localhost
        virtual-host: /
        port: 5672
        password: 123456
        username: yuanbin
  2. 定义一个延迟消息交换机
    public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";
    public static final String DELAY_QUEUE_NAME = "delay_queue_name";
    public static final String DELAY_ROUTING_KEY = "delay_routing_key";
    
    @Bean
    public CustomExchange delayExchange()
    {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }
  3. 定义一个消息队列
    @Bean
    public Queue queue()
    {
        Queue queue = new Queue(DELAY_QUEUE_NAME, true);
        return queue;
    }
  4. 绑定消息队列和交换机并添加路由
    @Bean
    public Binding binding(Queue queue, CustomExchange delayExchange)
    {
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }
  5. 编写生产者
    @GetMapping("/sendDelayMessage")
    public String sendDelayMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        System.out.println("DirectReceiver生产者发送消息  : " + createTime);
        //将消息携带绑定键值:DirectRabbitConfig.DELAY_ROUTING_KEY发送到交换机DirectRabbitConfig.DELAY_EXCHANGE_NAME
        rabbitTemplate.convertAndSend(DirectRabbitConfig.DELAY_EXCHANGE_NAME, DirectRabbitConfig.DELAY_ROUTING_KEY, map, new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息加上header,表明是一个延迟消息,并且设置过期时间3秒
                message.getMessageProperties().setHeader("x-delay",3000);
                return message;
            }
        });
        return "ok";
    }
  6. 消费者监听队列
    @RabbitListener(queues = DirectRabbitConfig.DELAY_QUEUE_NAME )
    @RabbitHandler
    //message就是传过来的参数值
    public void processMeatTwo(Map<String,Object> message) {
        System.out.println("processMeatTwo消费了队列meat_queue的消息:" + message);
    }
  7. 如果消息队列发送失败检查是否安装延迟消息插件

    查看方式 cmd进入命令行  D:\Program Files\rabbitmq_server-3.8.9>rabbitmq-plugins list



    rabbitmq_delayed_message_exchange插件
    下载地址:

    http://www.rabbitmq.com/community-plugins.html


版权声明:本文为Mr_yuanbin原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。