- pom.xml
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
2.application配置
spring.rabbitmq.host=192.168.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
3.队列交换机配置类
@Configuration
public class QueueConfig {
// 普通交换机
public static final String X_EXCHANGE = "X";
// 死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 普通队列
public static final String QUEUE_A = "QA";
// 死信队列
public static final String DEAD_LETTER_QUEUE = "QD";
// 申明交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列
@Bean("queueA")
public Queue queueA(){
Map<String,Object> arguments = new HashMap<>();
//声明当前队列绑定的死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
arguments.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
// 死信队列
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
// 绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
4.控制层发送消息
@Autowired
RabbitTemplate rabbitTemplate;
// 发消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送消息给两个TTL队列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X", "XA","消息来自ttl为10s的队列:"+ message);
rabbitTemplate.convertAndSend("X", "XB","消息来自ttl为40s的队列:"+ message);
}
5.接受消息
@Component
@Slf4j
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel)throws Exception{
String msg = new String(message.getBody());
log.info("当前时间:{}。收到死信队列的消息:{}",new Date().toString(),msg);
}
}
6.交换机示意图
版权声明:本文为weixin_47252341原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。