目录
rabbitmq-直接交换模式
rabbitmq-广播模式- 路由
rabbitmq-广播模式- 交换机
rabbitmq-转发模式
rabbitmq-延迟消费 在【队列上】
rabbitmq-延迟消费 在【消息上】
源码地址:https://gitee.com/caiwang/rabbitmq-project
上一篇介绍了延迟队列,现在这个只是加强一下理解
还是老规矩
在common项目中创建
DelaySenderQueuesCommon
public class DelaySenderQueuesCommon {
/**
* ========= delay_queue_per_queue_ttl:TTL配置在【队列上】的缓冲队列。。===========
*/
/**
* TTL配置在【队列上】的缓冲队列
*/
public static final String DELAY_QUEUES_QUEUE_TTL_NAME = "delay_queues_queue_ttl_name";
/**
* TTL配置在【队列上】的缓冲队列
*/
public static final String DELAY_QUEUES_SLOW_NAME = "delay_queues_slow_name";
/**
* 【消息上】 实际消费队列
*/
public static final String DELAY_QUEUES_QUEUE_NAME = "delay_queues_queue_name";
/**
* 设置队列的过期时间
*/
public static final long DELAY_QUEUES_EXPIRATION = 6000;
}
现在创建生产者
在项目【rabbitmq-sender-project06】中 创建
DelaySenderQueuesConfig
@Configuration
public class DelaySenderQueuesConfig {
/**
* 功能描述 delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列
* @return org.springframework.amqp.core.Queue
* @author cailu
* @date 2020/3/30 17:21
*/
@Bean
Queue delayQueuePerQueueTTL() {
return QueueBuilder.durable(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_TTL_NAME)
//DLX 这里声明当前队列绑定的死信交换机
.withArgument("x-dead-letter-exchange", DelaySenderQueuesCommon.DELAY_QUEUES_SLOW_NAME)
//DLK dead letter携带的routing key 这里声明当前队列的死信路由key
.withArgument("x-dead-letter-routing-key", DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME)
//TTL 设置队列的过期时间
.withArgument("x-message-ttl", DelaySenderQueuesCommon.DELAY_QUEUES_EXPIRATION)
.build();
}
/**
* 功能描述 配置DLX
* @param
* @return org.springframework.amqp.core.DirectExchange
* @author cailu
* @date 2020/3/31 15:08
*/
@Bean
DirectExchange delayExchange() {
return new DirectExchange(DelaySenderQueuesCommon.DELAY_QUEUES_SLOW_NAME);
}
/**
* 功能描述 创建 实际消费队列
* @param
* @return org.springframework.amqp.core.Queue
* @author cailu
* @date 2020/3/31 15:08
*/
@Bean
Queue delayProcessQueue() {
return QueueBuilder.durable(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME)
.build();
}
/**
* 功能描述 绑定
* @param delayProcessQueue 延迟处理队列
* @param delayExchange 延迟交换
* @return org.springframework.amqp.core.Binding
* @author cailu
* @date 2020/3/31 15:09
*/
@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayProcessQueue)
.to(delayExchange)
.with(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME);
}
}
创建消息模板
DelaySenderQueuesTemplate
@Component
public class DelaySenderQueuesTemplate {
private Logger logger = (Logger) LoggerFactory.getLogger(this.getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 功能描述 配置在【队列上】的延迟消费
* 在发布消息时 一定要注意 发布的消息 是在【延迟队列】中的【缓冲队列】里
* 通过TTL的绑定 使其在 死信队列 中 按照时间 自动消化掉
* @return void
* @author cailu
* @date 2020/3/31 11:19
*/
public void delayQueuePerQueueTTL() {
rabbitTemplate.convertAndSend(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_TTL_NAME,
"Message From delay_queues_queue_ttl_name with expiration " + DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME);
}
}
创建演示类controller:
DelaySenderQueuesController
@Api(value = "10006-延迟消费 对着在【队列上】", tags = {"10006-延迟消费 配置在【队列上】"})
@RestController
public class DelaySenderQueuesController {
@Autowired
private DelaySenderQueuesTemplate delaySenderQueuesTemplate;
@GetMapping("delayQueuePerQueueTTL")
@ApiOperation(value = "60001-创建 延迟消费 发布者", notes = "创建 延迟消费 发布者")
@ApiVersions(group = ApiVersionConstant.FAP_APP100)
public String delayQueuePerQueueTTL() {
delaySenderQueuesTemplate.delayQueuePerQueueTTL();
return "延迟消费 配置在 【队列上】 创建成功";
}
}
接下来我们直接创建消费者
在项目【rabbitmq-receiver-project06】创建:
DelaySenderQueuesReceiver:
@Component
public class DelaySenderQueuesReceiver {
/**
* 功能描述 监听 【队列上】 延迟消费
* @param str 内容
* @return void
* @author cailu
* @date 2020/3/31 15:14
*/
@RabbitListener(queues = DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME)
public void processQueue(String str) {
System.out.println("Receive---delay_queues_queue_name:========================:" + str.toString());
}
}
启动项目,用swagger发送请求
观察mq后台和消费者日志

可以看到 消费者,延迟消费成功
至此 rabbitmq-延迟消费 在【消息上】完毕
版权声明:本文为cai750415222原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。