RabbitMQ消息可靠性保证及AMQP

目录

一、AMQP简介

1.1 主要概念

二、RabbitMQ

2.1 消息机制

2.1.1 消息保存

2.1.2 消息确认

2.1.3 消息预取

2.2 可靠性保证

2.2.1 Broker在接收消息后宕机

2.2.2 生产者发送至Broker途中 / Broker在接收消息后写入磁盘前宕机

2.2.3 Broker发送至消费者途中

2.3 生产环境需要考虑问题

2.3.1 重复消费问题

2.3.2 启动手动ack,nack重入队导致死循环

2.3.3 消息堆积

2.3.4 消费者限流

2.3.5 消费失败重试


一、AMQP简介

全称Advanced Message Queuing Protocol,高级消息队列协议。

既然谈到了协议,顾名思义,是要求通信的双方遵循统一的约定规则,来达到通信的目的。而消息队列协议制定了一套通信标准,使得客户端、服务器、消息中间件的通信称为可能。

常见的消息队列协议有:AMQP、MQTT、STOMP、XMPP等,而RabbitMQ是基于AMQP的Erlang实现。

AMQP实质上是一个模型,与OSI或TCP/IP类似,也具有自己的分层架构,通常划分为三层:

  • 模型层:定义了一套命令,客户端利用这些命令来实现业务功能
  • 会话层:为客户端与服务器的传递过程提供可靠性、同步机制和错误处理
  • 传输层:提供帧处理、信道复用、错误检测及数据表示

1.1 主要概念

  • Message 【消息】:消息服务器处理数据的原子单元。内容头+属性+内容体
  • Publisher【消息生产者】:向交换机发布消息的客户端应用程序
  • Exchange【交换机】:接收生产者发送的消息,并路由给队列
  • Binding【绑定】:关联队列和交换机
  • Virtual Host【虚拟主机】:一个mini版的消息服务器,拥有自己的队列、交换机等权限机制
  • Broker【消息代理】:MQ服务器实体,接受客户端连接,实现AMQP消息队列和路由功能全过程
  • Routing Key【路由规则】:确定如何路由消息
  • Queue【消息队列】:保存消息直接发送给消费者。是消息的容器,也是消息的终点
  • Connection【连接】:客户端和MQ服务器之间的一个TCP连接
  • Channel【信道】:独立的双向数据流通道,建立在真实的TCP连接内的虚拟连接。AMQP消息都是通过信道发送的,一个连接可以包含多个信道。对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,因此引用信道,复用一个TCP连接
  • Consumer【消息消费者】:从消息队列获取消息的客户端应用程序

二、RabbitMQ

2.1 消息机制

2.1.1 消息保存

        RabbitMQ对于Queue中消息保存有disk和RAM两种方式。disk持久化至硬盘,消息数据会以.rdp文件保存,当达到16MB会生成新文件,当文件中消息被删除超过阈值会触发文件合并;而RAM方式不保存消息,启动时从其他节点同步,因此集群中必须存在至少一个disk节点。

持久化涉及Queue、Message、Exchange三部分:

  • Queue持久化:durable=true,仅持久化队列,但不会持久化消息
  • Message持久化:basicPublish的BasicProperties中deliveryMode=2
  • Exchange持久化:durable=true

如果不设置Queue、Exchange持久化,重启服务器即丢失,但持久化会影响服务器吞吐量,因此应具体业务具体分析。

2.1.2 消息确认

        见2.2.2、2.2.3

2.1.3 消息预取

        指定在ack之前每个消费者一次可接收的信息条数,可以提高吞吐量。(prefetch_count)

2.2 可靠性保证

消息可能丢失的存在三个过程:

  • 生产者发送至Broker途中
  • Broker在接收消息后宕机
  • Broker发送至消费者途中

2.2.1 Broker在接收消息后宕机

        持久化,见2.1.1

2.2.2 生产者发送至Broker途中 / Broker在接收消息后写入磁盘前宕机

        发送方确认模式。该模式下,从channel中发送的消息会被分配一个唯一ID,一旦消息被成功投递至队列,channel就像生产者发送确认消息。 普通确认 / 批量确认 / 异步确认。

2.2.3 Broker发送至消费者途中

        消费者应答。想要知道消费者的消费成功还是失败,要求消费者在消费完消息后发送一个消息回执(ack),RabbitMQ收到回执后将消息从队列移除,否则检测到连接断开,则将消息给其他消费者,若没断开,则一直等到其执行完毕,不论多久。

        AMQP定义了两种应答模式:自动 or 手动。自动模式下,消息处理成功且没有异常抛出,消息就会被自动删除;而手动模式可以更好的把控自身逻辑(比如虽然成功拿到消息,但消费到一半宕机了,也没有ACK,消息就不会丢失)。且自动回执存在一个问题,一旦在消费途中程序抛出异常且没有截止条件,Broker会无限重发消息

2.3 生产环境需要考虑问题

2.3.1 重复消费问题

由于代码逻辑或者补偿机制导致消息重发,消费者必须做好幂等性

  • Redis设置唯一标识避免重复消费
  • 即使重复消费,多次执行结果一致

2.3.2 启动手动ack,nack重入队导致死循环

成功ack,失败nack,假设此时恰好把nack放在了catch里,每次nack,消息都会重新到队头,然后重新消费又抛异常进入catch,又nack,又重新到对头,如此往复。此消息消费不了,新的消息排不到对头,造成消息堆积。

  • 避免反复重试,在redis记录次数,超过次数后转发至其他MQ或者死信队列或者弹出告警手动处理,避免死循环。

2.3.3 消息堆积

消费过慢或者消费者出现问题造成消息堆积

  • 解决消费者问题
  • 增加消费者数量
  • 设置消息堆积监控

2.3.4 消费者限流

若消费者对收到的消息不加节制,照单全收,在消息剧增的情况下会导致CPU爆满。

  • 消息预取(prefetch_count)+手动ack
  • 阻塞队列

2.3.5 消费失败重试

  • 采用Spring自带重试机制,同时配合Redis限制重试次数
  • 采用nack将失败消息重新入队首,同时配合Redis限制重试次数
  • 手动重试,将失败消息放入本地缓存或阻塞队列,新开启线程异步处理(在重启服务时可能会导致消息丢失)
  • 原地同步重试(在重启服务时可能会导致消息丢失)
  • 放入Redis延迟队列

版权声明:本文为qq_26012495原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。