目录
2.2.2 生产者发送至Broker途中 / Broker在接收消息后写入磁盘前宕机
一、AMQP简介
全称Advanced Message Queuing Protocol,高级消息队列协议。
既然谈到了协议,顾名思义,是要求通信的双方遵循统一的约定规则,来达到通信的目的。而消息队列协议制定了一套通信标准,使得客户端、服务器、消息中间件的通信称为可能。
常见的消息队列协议有:AMQP、MQTT、STOMP、XMPP等,而RabbitMQ是基于AMQP的Erlang实现。
AMQP实质上是一个模型,与OSI或TCP/IP类似,也具有自己的分层架构,通常划分为三层:
- 模型层:定义了一套命令,客户端利用这些命令来实现业务功能
- 会话层:为客户端与服务器的传递过程提供可靠性、同步机制和错误处理
- 传输层:提供帧处理、信道复用、错误检测及数据表示
1.1 主要概念
- Message 【消息】:消息服务器处理数据的原子单元。内容头+属性+内容体
- Publisher【消息生产者】:向交换机发布消息的客户端应用程序
- Exchange【交换机】:接收生产者发送的消息,并路由给队列
- Binding【绑定】:关联队列和交换机
- Virtual Host【虚拟主机】:一个mini版的消息服务器,拥有自己的队列、交换机等权限机制
- Broker【消息代理】:MQ服务器实体,接受客户端连接,实现AMQP消息队列和路由功能全过程
- Routing Key【路由规则】:确定如何路由消息
- Queue【消息队列】:保存消息直接发送给消费者。是消息的容器,也是消息的终点
- Connection【连接】:客户端和MQ服务器之间的一个TCP连接
- Channel【信道】:独立的双向数据流通道,建立在真实的TCP连接内的虚拟连接。AMQP消息都是通过信道发送的,一个连接可以包含多个信道。对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,因此引用信道,复用一个TCP连接
- Consumer【消息消费者】:从消息队列获取消息的客户端应用程序
二、RabbitMQ
2.1 消息机制
2.1.1 消息保存
RabbitMQ对于Queue中消息保存有disk和RAM两种方式。disk持久化至硬盘,消息数据会以.rdp文件保存,当达到16MB会生成新文件,当文件中消息被删除超过阈值会触发文件合并;而RAM方式不保存消息,启动时从其他节点同步,因此集群中必须存在至少一个disk节点。
持久化涉及Queue、Message、Exchange三部分:
- Queue持久化:durable=true,仅持久化队列,但不会持久化消息
- Message持久化:basicPublish的BasicProperties中deliveryMode=2
- Exchange持久化:durable=true
如果不设置Queue、Exchange持久化,重启服务器即丢失,但持久化会影响服务器吞吐量,因此应具体业务具体分析。
2.1.2 消息确认
见2.2.2、2.2.3
2.1.3 消息预取
指定在ack之前每个消费者一次可接收的信息条数,可以提高吞吐量。(prefetch_count)
2.2 可靠性保证
消息可能丢失的存在三个过程:
- 生产者发送至Broker途中
- Broker在接收消息后宕机
- Broker发送至消费者途中
2.2.1 Broker在接收消息后宕机
持久化,见2.1.1
2.2.2 生产者发送至Broker途中 / Broker在接收消息后写入磁盘前宕机
发送方确认模式。该模式下,从channel中发送的消息会被分配一个唯一ID,一旦消息被成功投递至队列,channel就像生产者发送确认消息。 普通确认 / 批量确认 / 异步确认。
2.2.3 Broker发送至消费者途中
消费者应答。想要知道消费者的消费成功还是失败,要求消费者在消费完消息后发送一个消息回执(ack),RabbitMQ收到回执后将消息从队列移除,否则检测到连接断开,则将消息给其他消费者,若没断开,则一直等到其执行完毕,不论多久。
AMQP定义了两种应答模式:自动 or 手动。自动模式下,消息处理成功且没有异常抛出,消息就会被自动删除;而手动模式可以更好的把控自身逻辑(比如虽然成功拿到消息,但消费到一半宕机了,也没有ACK,消息就不会丢失)。且自动回执存在一个问题,一旦在消费途中程序抛出异常且没有截止条件,Broker会无限重发消息。
2.3 生产环境需要考虑问题
2.3.1 重复消费问题
由于代码逻辑或者补偿机制导致消息重发,消费者必须做好幂等性
- Redis设置唯一标识避免重复消费
- 即使重复消费,多次执行结果一致
2.3.2 启动手动ack,nack重入队导致死循环
成功ack,失败nack,假设此时恰好把nack放在了catch里,每次nack,消息都会重新到队头,然后重新消费又抛异常进入catch,又nack,又重新到对头,如此往复。此消息消费不了,新的消息排不到对头,造成消息堆积。
- 避免反复重试,在redis记录次数,超过次数后转发至其他MQ或者死信队列或者弹出告警手动处理,避免死循环。
2.3.3 消息堆积
消费过慢或者消费者出现问题造成消息堆积
- 解决消费者问题
- 增加消费者数量
- 设置消息堆积监控
2.3.4 消费者限流
若消费者对收到的消息不加节制,照单全收,在消息剧增的情况下会导致CPU爆满。
- 消息预取(prefetch_count)+手动ack
- 阻塞队列
2.3.5 消费失败重试
- 采用Spring自带重试机制,同时配合Redis限制重试次数
- 采用nack将失败消息重新入队首,同时配合Redis限制重试次数
- 手动重试,将失败消息放入本地缓存或阻塞队列,新开启线程异步处理(在重启服务时可能会导致消息丢失)
- 原地同步重试(在重启服务时可能会导致消息丢失)
- 放入Redis延迟队列