死信队列
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但是某些时候由于特定的原因queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信当然就有死信队列。
死信的来源
1.消息TTL过期
2.队列达到最大长度(队列满了,无法再添加数据到mq中)
3.消息被拒绝
死信队列代码架构图
可以看出死信消息的处理过程是与正常队列有关,如果消息在正常队列中不能够被消费,通过正常队列绑定死信交换机和死信队列,这样就可以完成消息从正常队列到死信队列的转移。
消息TTL过期
生产者代码:
public class ExerciseProducer {
//普通交换机名字
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
//在此处设置TTL,其实也可以在消费者中设置TTL,但是习惯一般在
//生产者中设置,此处设置为10s
//设置TTL为10秒
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
System.out.println("生产者发送消息:"+message);
}
}
}
消费者代码:
public class ExerciseConsumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
private static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
Map<String,Object> arguments = new HashMap<>();
//正常队列设置死死信交换机和设置死信路由
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "lisi");
//正常队列声明
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//死新队列声明
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//正常队列和正常交换机绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//死信交换机和死新队列的绑定
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息");
channel.basicConsume(NORMAL_QUEUE,true,(consumerTag, message) -> System.out.println(new String(message.getBody(), "utf-8")), consumerTag -> {
});
}
}
首先启动消费者代码,然后停掉,模拟ttl过期的场景,
可以看到,十秒过后,未被接收的消息全部到了死信队列(DEAD_QUEUE)队列中去了。
队列达到最大长度
其实这里的代码和上面TTL的代码类似,只需要在生产者中把设置TTL的这个注释掉,然后在消费者中加入如下即可
设置队列的最大长度。
消息被拒绝
生产者代码和队列最大长度一致,在消费者代码中的deliveryCallBack回调函数中可以拒绝,然后消息就会进入死信队列当中。
版权声明:本文为qq_42212926原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。