SpringCloud-Day6-服务异步通讯RabbitMQ
1.初始MQ
1.1同步通讯和异步通讯
1.2 同步调用的问题
微服务间基于Feign的调用就属于同步方式,存在一些问题
1.3异步调用
异步调用常见实现就是事件驱动模式
1.4什么是MQ?
MQ(Message Queue):中文消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
2.MQ安装
2.1RabbitMQ概述和安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官方地址https://www.rabbitmq.com/
RabbitMQ的结构和概念
2.2常见消息类型
3.SpringAMQP
3.1什么是SpringAMQP
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
3.2利用SpringAMQP实现HelloWorld中的基础消息队列功能
流程如下:
1.在父工程中引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在publisher服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.150.101 port: 5672 username: itcast password: 123321 virtual-host: /
- 在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue(){ String queueName="simple.queue"; String message ="hello,spring amqp"; rabbitTemplate.convertAndSend(queueName,message); } }
3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列
在consumer服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.150.101 port: 5672 username: itcast password: 123321 virtual-host: /
在consumer服务中新建一个类,编写消费逻辑:
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException{ System.out.println("消费者接收到simple.queue的消息:【"+msg+"】"); } }
3.3SpringAMQP-WorkQueue模型
WorkQueue 工作队列,可以提高消息处理速度,避免队列消息堆积模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下:
在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue(){ String queueName="simple.queue"; String message ="hello,spring amqp"; rabbitTemplate.convertAndSend(queueName,message); } @Test public void testSendMessage2WorkQueue() throws InterruptedException { String queueName="simple.queue"; String message ="hello,spring amqp"; for (int i = 1; i <50 ; i++) { rabbitTemplate.convertAndSend(queueName,message+i); Thread.sleep(20); } } }
在consumer服务中定义两个消息监听者,都监听simple.queue队列
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException{ System.out.println("消费者1接收到消息:【"+msg+"】"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException{ System.out.println("消费者2........接收到消息:【"+msg+"】"+LocalTime.now()); Thread.sleep(200); } }
消费者1每秒处理50条消息,消费者2每秒处理10条消息
3.4SpringAMQP-发布订阅模型
发布订阅模式与之前案例的却别就是允许同一消息发送给多个消费者。实现方式是加入了exchange(交换机),常见exchange类型包含:
- Fanout:广播
- Direct :路由
- Topic :话题
3.5SpringAMQP-发布订阅模型-Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
3.6SpringAMQP-发布订阅模型-Direct Exchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)
3.7SpringAMQP-发布订阅模型-Topic Exchange
Topic Exchange与Direct Exchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割