消息中间件-RabbitMQ配置使用

1. MQ的使用场景及优点

  1. 异步通信
    有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把消息放入队列,但并不立即处理它。想在队列中放入多少消息就放多少,然后在需要的时候再去处理他。

A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。

在这里插入图片描述
一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。

如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了,爽!网站做得真好,真快!

在这里插入图片描述
2. 解耦
降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…

在这里插入图片描述
在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!

如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

在这里插入图片描述
3. 过载保护(削峰)
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。

一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。

但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。

在这里插入图片描述
如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。

在这里插入图片描述
这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。

其他优点:

冗余(一致性)
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

2. MQ消息模型

  1. Pub/Sub发布订阅(广播):一份消息需要发给所有订阅的消费者 (使用topic作为通信载体)

  2. PTP点对点: 一份消息只发给特定的目标消费者(使用queue作为通信载体)

每一种MQ中间件在实现这2种模型时的具体方法有所区别, 比如消息保存方式, 分发方式等, 某些中间件还有更多种拓展的消息模型.

2.1 RabbitMQ 7种工作模式

RabbitMQ是一个消息代理:它接受并转发消息。你可以把它当成一个邮局:当你想邮寄信件的时候,你会把信件放在投递箱中,并确信邮递员最终会将信件送到收件人的手里。在这个例子中,RabbitMQ就相当与投递箱(客户生产的消息发给路由器)、邮局(路由器将消息发给不同队列)和邮递员(队列采用推的模式发到消费端)
RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。所以支持多订阅时,消息会多个拷贝。

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

2.1.1 简单模式

在这里插入图片描述

  1. 消息产生者将消息放入队列
  2. 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

2.1.2 工作模式(资源竞争)

在这里插入图片描述

  1. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
  2. 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

2.1.3 发布订阅

在这里插入图片描述

  1. X代表交换机rabbitMQ内部组件(exchange),erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
  2. 应用场景:邮件群发,群聊天,广播(广告)

2.1.4 路由模式

在这里插入图片描述

  1. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
  2. 根据业务功能定义路由字符串
  3. 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务4. 应用场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

2.1.5 主题模式(模糊路由)

在这里插入图片描述

  1. 星号井号代表通配符
  2. 星号代表多个单词,井号代表一个单词
  3. 路由功能添加模糊匹配
  4. 消息产生者产生消息,把消息交给交换机
  5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

2.1.6 RPC模式

在这里插入图片描述

  1. 客户端开始时, 穿件一个匿名回调队列
  2. 客户端发送消息时, 附带2个参数: reply_to(回调队列), correlation_id(每个请求的唯一ID)
  3. 请求被发送到rpc_queue
  4. 服务器从rpc_queue接收到消息,处理完成后将结果发送到reply_to队列
  5. 客户端在回调队列接收到处理结果, 检查correlation_id

2.1.7 发布者确认

在通道上启用发布者确认后,代理将异步确认客户端发布的消息,这意味着它们已在服务器端处理。

4种交换器,分别是:direct、topic、headers、fanout
direct: 路由模式(完整匹配)
topic: 主题模式(模糊匹配)
fanout: 订阅模式(发给所有binding队列)
headers: 根据消息header中的内容分发

3. 启动RabbitMQ

docker pull rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq
//15672:控制台端口号 控制台端口用于管理rabbitmq,
//5672:应用访问端口号 应用访问端口号为rabbitclient等应用访问。
docker exec -it rabbitmq bash
/opt/rabbitmq/plugins# rabbitmq-plugins enable rabbitmq_management //启动控制台
打开localhost:15672 //用户名密码 guest/guest

4. Maven配置

		<dependency>
        	<groupId>org.springframework.boot</groupId>
        	<artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

5. application.properties配置

(springboot方式适用)

#rabbitmq连接配置(必须)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#rabbitmq高级配置
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
#rabbitmq消费端配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual  #手动回复
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.prefetch=1  #一次取一条消息

6. Java代码

6.1 自己建立connnection(springMVC老式方法)

没有queue时,代码会自动创建声明的按个queue

//自己构建连接的方式实现发送消息到queue
	@PostMapping("/send2MQ")
	public void send2MQ(@RequestParam(value="queue") String queue,@RequestParam(value="message") String message) throws IOException, TimeoutException{
		
		  ConnectionFactory factory = new ConnectionFactory();
		  factory.setUsername("guest");
		  factory.setPassword("guest");
		  factory.setHost("localhost");
		  factory.setPort(5672);
		  factory.setVirtualHost("/");
		  Connection connection = factory.newConnection();
		  Channel channel = connection.createChannel();
		  channel.queueDeclare(queue, false, false, false, null);
		  channel.basicPublish("", queue, null, message.getBytes());
		  System.out.println(" [x] Sent '" + message + "'");
		  channel.close();
		  connection.close();

	}
	
	//自己构建连接的方式实现获取queue中的消息
	@GetMapping("/getMQ")
	public void getMQ(@RequestParam(value="queue") String queue) throws IOException, TimeoutException{
		
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setUsername("guest");
		factory.setPassword("guest");
		factory.setHost("localhost");
		factory.setPort(5672);
		factory.setVirtualHost("/");
		factory.setConnectionTimeout(600000); // in milliseconds
		factory.setRequestedHeartbeat(60); // in seconds
		factory.setHandshakeTimeout(6000); // in milliseconds
		factory.setRequestedChannelMax(5);
		factory.setNetworkRecoveryInterval(500); 
	    
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
 
	    channel.queueDeclare(queue, false, false, false, null);
	    System.out.println("Waiting for messages. ");
 
	    Consumer consumer = new DefaultConsumer(channel) {
	      @Override
	      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
	          throws IOException {
	        String message = new String(body, "UTF-8");
	        System.out.println(" [x] Received '" + message + "'");
	      }
	    };
	    channel.basicConsume(queue, true, consumer);
	  }

6.2 使用RabbitTemplate(springboot方式)

package com.example.demo.service.impl;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service
public class RabbitMQTemplateImpl {
	
	//使用rabbittemplate的方式发送消息到exchange,由exchange转发到queue
	//自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  
    
    //回调函数: confirm确认, MQ确认收到消息
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                //可以进行日志记录、异常处理、补偿处理等
                System.err.println("异常处理....");
            }else {
                //更新数据库,可靠性投递机制
            }
        }
    };
    
    //回调函数: return返回, 路由无法转发消息或者MQ无法收到消息时,用return中的方式处理
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息,controller中调用send方法可以发送新消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一  用于ack保证唯一一条消息,这边做测试写死一个。但是在做补偿策略的时候,必须保证这是全局唯一的消息
        CorrelationData correlationData = new CorrelationData("1234567890");
        //exchange-1是路由器名, springboot.#是转发关键字,在控制台中配置
        rabbitTemplate.convertAndSend("exchange-1", "springboot.xyz", msg, correlationData);
    }
    
    //消费消息, 由于RabbitMQTemplateImpl被注册为service,onMessage方法在程序启动时一直处于监听状态, 随时接收消息
	@RabbitListener(queues="queue-2")
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK,获取deliveryTag
        channel.basicAck(deliveryTag, false);
    }

}

controller中接口调用发送消息

	@Autowired
	private RabbitMQTemplateImpl rabbitImpl;
	
	@PostMapping("/sendTemplate")
	public void sendTemplate() throws Exception{
		HashMap<String, Object> hm = new HashMap<>();
		hm.put("a1", "kevin");
		rabbitImpl.send("abc", hm);
	}
	

Exchange和Queue的绑定工作在控制太中手动完成(也可代码中实现)

在这里插入图片描述

一个connection是一个TCP连接, 由于RabbitMQ采用的NIO(同步非阻塞IO)方式建立连接, 多个生产者共用一个connection中的多个channel


版权声明:本文为whatkevin1984原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。