RocketMQ-offset管理

前言

这里offset是指的是Consumer的消息进度offset,消费进度offset是用来记录每个Queue的不同消费组的消息进度的,根据消费进度记录器的不同,可以分为两种模式,本地模式和远程模式

offset本地管理模式

当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在进度交集。Consumer在广播模式下offset相关数据以json的形式持久化到Consumer本地磁盘文件中,默认文件为当前用户目录下的.rocketmq_offset/c l i e n t I d / g r o u p / O f f s e t s . j s o n , 其 中 / {clientId}/group/Offsets.json,其中/clientId/group/Offsets.json/{clientId}为当前消费者id,默认为ip@DEFAULT;group为消组名称。

offset远程管理模式

当消费者模式为集群模式时,offset使用远程模式管理,因为Consumer实例对消息采用负载均衡消费,所有Consumer共享Queue的消费进度,Consumer在集群模式下offset相关数据以json的形式持久化到Broker磁盘中,文件路径为store/config/consumerOffset.json。Broker启动的时候会加载这个文件,并写入到一个双层的Map,外层map的key为topic@group,value为内层map,内层map的key为QueueId,value为offset,当发生Rebalance时,新的Consumer会从该Map获取相应的数据来继续消费

offset用途

消费者是如何从最开始次序消费消息的?消费者要消费的第一条消息的起始位置是用户自己通过consumer.setConsumeFromWhere()方法指定的。在Consumer启动后,其要消费的第一条消息的起始位置有三种,这三种通过枚举类型常量设置,这个枚举类型为ConsumerFromWhere,当消费完一批消息后,Consumer会提交消费进度offset给Broker,Broker在收到消费进度后悔将其更新到那个双层Map及consumerOffset.json文件中,然后向该Consumer进行ACK,而ACK内容中包含三项数据,当前消费队列最小offset(minOffset)、最大offset(maxOffset)以及下次消费的起始offset(nextBeginOffset)

重试队列

当RocketMQ对消息消费出现异常时,会将发生异常消息的offset提交到Broker的重试队列中,系统发生消息消费异常时会为当前的Topic创建一个重试队列,该队列以%RETRY%开头,到达重试时间后进行消费重试
在这里插入图片描述

offset同步提交/异步提交

集群消费模式下,Consumer消费完后会向Broker提交消费进度offset,其提交方式分为两种
同步提交
消费者在消费完成一批消息后会向Broker提交这些消息的offset,然后等待Broker的成功相应,若等待超时之前收到了成功响应,则继续读取下一批消息进行消费,若没有在超时时间内收到响应,则会重新提交,只到获取到响应,而这个等待过程中消费者是阻塞的,其严重影响了消费者的吞吐量

异步消费
消费者在完成一批消息后向Broker提交offset,但无需等待Broker的成功响应,可以继续读取并消费下一批消息,这种方式增加了消费者的吞吐量,但需要注意,Broker在收到提交的offset后,还是会向消费者进行响应


版权声明:本文为CSDN877425287原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。