RabbbitMQ之死信队列的三大来源(TTL,队列最大长度,消息被拒绝)

死信队列

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但是某些时候由于特定的原因queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信当然就有死信队列。

死信的来源

1.消息TTL过期
2.队列达到最大长度(队列满了,无法再添加数据到mq中)
3.消息被拒绝
死信队列代码架构图
在这里插入图片描述可以看出死信消息的处理过程是与正常队列有关,如果消息在正常队列中不能够被消费,通过正常队列绑定死信交换机和死信队列,这样就可以完成消息从正常队列到死信队列的转移。

消息TTL过期

生产者代码:

public class ExerciseProducer {
    //普通交换机名字
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
        //在此处设置TTL,其实也可以在消费者中设置TTL,但是习惯一般在
        //生产者中设置,此处设置为10s
        //设置TTL为10秒
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
            System.out.println("生产者发送消息:"+message);
        }
    }
}

消费者代码:

public class ExerciseConsumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名称
    private static final String NORMAL_QUEUE = "normal_queue";
    //死信队列名称
    private static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明普通队列
        Map<String,Object> arguments = new HashMap<>();
        //正常队列设置死死信交换机和设置死信路由
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        //正常队列声明
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //死新队列声明
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //正常队列和正常交换机绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //死信交换机和死新队列的绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息");
        channel.basicConsume(NORMAL_QUEUE,true,(consumerTag, message) -> System.out.println(new String(message.getBody(), "utf-8")), consumerTag -> {
        });

    }
}

首先启动消费者代码,然后停掉,模拟ttl过期的场景,
在这里插入图片描述可以看到,十秒过后,未被接收的消息全部到了死信队列(DEAD_QUEUE)队列中去了。

队列达到最大长度

其实这里的代码和上面TTL的代码类似,只需要在生产者中把设置TTL的这个注释掉,然后在消费者中加入如下即可
在这里插入图片描述
设置队列的最大长度。

消息被拒绝

生产者代码和队列最大长度一致,在消费者代码中的deliveryCallBack回调函数中可以拒绝,然后消息就会进入死信队列当中。


版权声明:本文为qq_42212926原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。