rabbitmq + Spring boot 整合-延迟消费 在【消息上】(06)

目录

rabbitmq-直接交换模式
rabbitmq-广播模式- 路由
rabbitmq-广播模式- 交换机
rabbitmq-转发模式
rabbitmq-延迟消费 在【队列上】
rabbitmq-延迟消费 在【消息上】

源码地址:https://gitee.com/caiwang/rabbitmq-project

上一篇介绍了延迟队列,现在这个只是加强一下理解
还是老规矩

在common项目中创建

DelaySenderQueuesCommon

public class DelaySenderQueuesCommon {

	/**
	 * ========= delay_queue_per_queue_ttl:TTL配置在【队列上】的缓冲队列。。===========
	 */
	/**
	 * TTL配置在【队列上】的缓冲队列
	 */
	public static final String DELAY_QUEUES_QUEUE_TTL_NAME = "delay_queues_queue_ttl_name";

	/**
	 * TTL配置在【队列上】的缓冲队列
	 */
	public static final String DELAY_QUEUES_SLOW_NAME = "delay_queues_slow_name";
	/**
	 * 【消息上】 实际消费队列
	 */
	public static final String DELAY_QUEUES_QUEUE_NAME = "delay_queues_queue_name";

	/**
	 * 设置队列的过期时间
	 */
	public static final long DELAY_QUEUES_EXPIRATION = 6000;

}

现在创建生产者

在项目【rabbitmq-sender-project06】中 创建

DelaySenderQueuesConfig

@Configuration
public class DelaySenderQueuesConfig {


	/**
	 * 功能描述 delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列
	 * @return org.springframework.amqp.core.Queue
	 * @author cailu
	 * @date 2020/3/30 17:21
	 */
	@Bean
	Queue delayQueuePerQueueTTL() {
		return QueueBuilder.durable(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_TTL_NAME)
				//DLX 这里声明当前队列绑定的死信交换机
				.withArgument("x-dead-letter-exchange", DelaySenderQueuesCommon.DELAY_QUEUES_SLOW_NAME)
				//DLK  dead letter携带的routing key 这里声明当前队列的死信路由key
				.withArgument("x-dead-letter-routing-key", DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME)
				//TTL 设置队列的过期时间
				.withArgument("x-message-ttl", DelaySenderQueuesCommon.DELAY_QUEUES_EXPIRATION)
				.build();
	}


	/**
	 * 功能描述  配置DLX
	 * @param
	 * @return org.springframework.amqp.core.DirectExchange
	 * @author cailu
	 * @date 2020/3/31 15:08
	 */
	@Bean
	DirectExchange delayExchange() {
		return new DirectExchange(DelaySenderQueuesCommon.DELAY_QUEUES_SLOW_NAME);
	}

	/**
	 * 功能描述  创建 实际消费队列
	 * @param
	 * @return org.springframework.amqp.core.Queue
	 * @author cailu
	 * @date 2020/3/31 15:08
	 */
	@Bean
	Queue delayProcessQueue() {
		return QueueBuilder.durable(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME)
				.build();
	}

	/**
	 * 功能描述  绑定
	 * @param delayProcessQueue 延迟处理队列
	 * @param delayExchange     延迟交换
	 * @return org.springframework.amqp.core.Binding
	 * @author cailu
	 * @date 2020/3/31 15:09
	 */
	@Bean
	Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
		return BindingBuilder.bind(delayProcessQueue)
				.to(delayExchange)
				.with(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME);
	}
}

创建消息模板

DelaySenderQueuesTemplate

@Component
public class DelaySenderQueuesTemplate {

	private Logger logger = (Logger) LoggerFactory.getLogger(this.getClass());

	@Autowired
	private RabbitTemplate rabbitTemplate;


	/**
	 * 功能描述 配置在【队列上】的延迟消费
	 * 在发布消息时 一定要注意 发布的消息 是在【延迟队列】中的【缓冲队列】里
	 * 通过TTL的绑定 使其在 死信队列 中 按照时间 自动消化掉
	 * @return void
	 * @author cailu
	 * @date 2020/3/31 11:19
	 */
	public void delayQueuePerQueueTTL() {
		rabbitTemplate.convertAndSend(DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_TTL_NAME,
				"Message From delay_queues_queue_ttl_name with expiration " + DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME);
	}


}

创建演示类controller:

DelaySenderQueuesController

@Api(value = "10006-延迟消费 对着在【队列上】", tags = {"10006-延迟消费 配置在【队列上】"})
@RestController
public class DelaySenderQueuesController {

	@Autowired
	private DelaySenderQueuesTemplate delaySenderQueuesTemplate;

	@GetMapping("delayQueuePerQueueTTL")
	@ApiOperation(value = "60001-创建 延迟消费 发布者", notes = "创建 延迟消费 发布者")
	@ApiVersions(group = ApiVersionConstant.FAP_APP100)
	public String delayQueuePerQueueTTL() {
		delaySenderQueuesTemplate.delayQueuePerQueueTTL();
		return "延迟消费 配置在 【队列上】 创建成功";
	}
}

接下来我们直接创建消费者
在项目【rabbitmq-receiver-project06】创建:

DelaySenderQueuesReceiver:

@Component
public class DelaySenderQueuesReceiver {

	/**
	 * 功能描述  监听 【队列上】 延迟消费
	 * @param str 内容
	 * @return void
	 * @author cailu
	 * @date 2020/3/31 15:14
	 */
	@RabbitListener(queues = DelaySenderQueuesCommon.DELAY_QUEUES_QUEUE_NAME)
	public void processQueue(String str) {
		System.out.println("Receive---delay_queues_queue_name:========================:" + str.toString());
	}
}

启动项目,用swagger发送请求

观察mq后台和消费者日志

在这里插入图片描述
可以看到 消费者,延迟消费成功


至此 rabbitmq-延迟消费 在【消息上】完毕

源码地址:https://gitee.com/caiwang/rabbitmq-project


版权声明:本文为cai750415222原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。