RocketMQ原理学习-消息发送

学习重点

  • 消息发送方式
  • 消息启动流程
  • 消息发送流程

一、消息发送方式

RocketMQ 支持三种消息发送方式:同步(sync)、异步(async)、单向(oneway)。

  • 同步:发送者向 MQ 执行发送消息 API 时,同步等待,直到消息服务器返回发送结果。
  • 异步:发送者向 MQ 执行发送消息 API 时,指定消息发送成功后的回掉函数,然后调用消息发送 API 后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
  • 单向:消息发送者向 MQ 执行发送消息 API 时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。

二、消息启动流程

消息生产者的代码都在 client 模块中,相对于 RocketMQ 来说,它就是客户端,也是消息的提供者,我们在应用系统中初始化生产者的一个实例即可使用它来发消息。

生产者实现类

DefaultMQProducer 是默认的消息生产者实现类,主要用于创建topic、发送消息、查找消息。下面看一些比较关键的属性:

  • sendMsgTimeout:发送消息默认超时时间, 默认 3s
  • compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认 4K
  • retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为 2,总共执行 3 次
  • retryTimesWhensendAsyncFailed:异步方式发送消息重试次数,默认为 2
  • retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个 Broker 时, 是否不等待存储结果就返回,默认为 false
  • maxMessageSize:允许发送的最大消息长度,默认为 4M ,该值最大值为 2^32-1

生产者启动流程

  • 检查 productGroup 是否符合要求,并改变生产者 instanceName 为进程 ID;
  • 创建 MQClientlnstance 实例,整个 JVM 实例中只存在一个 MQClientManager 实例,也就是同一个 clientId 只会创建一个 MQClientlnstance;
  • 向 MQClientlnstance 注册,将当前生产者加入到 MQClientlnstance 管理中,方便后续调用网络请求、进行心跳检测等;
  • 启动 MQClientlnstance ,如果 MQClientlnstance 已经启动 ,则本次启动不会真正执行。

三、消息发送流程

消息发送流程主要的步骤:验证消息、查找路由、消息发送 (包含异常处理机制),默认消息发送以同步方式发送,默认超时时间 3s,发送流程如下图所示。
请添加图片描述

验证消息长度

主题名称、消息体不能为空,消息长度不能等于0且默认不能超过允许发送消息的最大长度4M。

查找路由主题信息

  • 如果生产者缓存了 topic 的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息。
  • 如果没有缓存或者包含消息队列,则向 NameServer 查询该 topic 的路由信息,如果最终没有找到,则抛出异常。

选择消息队列

选择消息队列有两种方式:不启用 Broker 故障延迟机制(默认)、启用 Broker 故障延迟机制。

发送消息

  • 获取 Broker 的网络地址,如果缓存不存在,则向 NameServer 主动更新一下 topic 的路由信息,如果更新后还是找不到 Broker 信息,则抛出异常。
  • 为消息分配全局唯一ID,如果消息体默认超过4K,会对消息体进行zip压缩并做标记,如果是事务消息,也会做相应的标记。
  • 如果注册了消息发送勾子函数,则执行消息发送之前的增强逻辑。
  • 构建消息发送请求包。
  • 根据消息发送方式,同步、异步、单向方式进行网络传输。
  • 如果注册了消息发送勾子函数,执行after逻辑,这里注意就算抛异常也会执行,类似finally。

注意:

  • 消息发送端采用重试机制:同步发送在发送失败之后进行重试,异步发送在收到消息发送结果后执行回调之前进行重试。
  • 同步发送如果消息重试次数超过允许的最大重试次数(默认16),消息会被放入DLQ(死信队列),命名格式:%DLQ%+消费组名,死信队列通常情况下不能被消费,但是也可以被订阅和消费,也可以通过人工处理。

总结

本章重点学习了消息发送的整个流程,包括消息的发送方式,生产者的启动流程和消息的发送流程。


版权声明:本文为m0_54065725原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。