RocketMQ的Consumer源码分析
这一节我们分析RocketMQ的Consumer的启动流程以及消息处理过程。下图是RocketMQ启动流程、Rebalance、以及消息拉取处理的较为完整的流程图。

##Consumer的启动流程
Consumer的启动主要会经过以下步骤:
- 校验Consumer一系列的配置,包括消费组、消费者配置等信息
- 校验消费组是否非空、是否超长、是否含有非法字符、是否系统占用等
- 校验一系列配置信息是否设置,如消费模式、消费位置、消息监听器等
- 校验消费者线程大小、消费者拉取消息的大小、批次大小等
- 复制订阅信息到RebalanceImpl,copySubscription()会把当前Topic的订阅信息放到RebalanceImpl的map中
- 如果是集群消费模式,使用PID设置实例名称
- 创建MQClientInstance,会优先从缓存中获取,如果缓存中有则直接返回,这也意味着相同的clientId会共享MQClientInstance
- 为RebalanceImpl设置所需要的属性,如消费组、消费模式、Queue分配策略等
- 初始化一个PullAPIWrapper,这个PullAPIWrapper是会为后面从Broker拉取消息服务
- 执行offsetStore.load(),广播模式下会选择从本地加载消费进度,集群模式默认什么也不做
- 根据顺序/并发消费方式初始化对应的消息消费服务
- 启动消息消费服务,这里会开启一个延时任务定期执行
- 对于并发消费方式,会定期清理过期消息,默认每15分钟执行一次
- 对于顺序消费方式且为集群消费模式时,会定期将消费的Queue锁定,默认每20秒执行一次
- 向MQClientInstance注册Consumer,相同的consumerGroup共享DefaultMQPushConsumer
- 启动MQClientInstance,这一步在初始化过程是Consumer启动的关键
- 如果没有指定NameServer地址,则会尝试从配置中心拉取,我们通常会在启动Producer时指定,没有指定的场景通常用在多集群的业务中,方便统一进行管理
- 启动MQClientAPIImpl,这里会完成NettyClient的初始化,为网络通信做准备
- 开启一系列的定时任务
- 如果没有指定NameServer地址,则定期从配置中心拉取
- 定期更新Topic的路由信息,这里会随机选择一个NameServer进行通信,获取topic对应的路由信息并更新到本地的内存中
- 定期清理下线的Broker,并且给所有的Broker发送心跳,默认每30秒执行一次
- 定期持久化消费进度,默认每5秒执行一次
- 定期调整消费者线程大小
- 启动PullMessageService,开始处理PullRequest,PullMessageService是一个循环执行的线程
- 从pullRequestQueue队列中获取一个PullRequest,如果没有获取到则会阻塞直到从队列中获取到数据为止
- 根据消费组获取到对应的MQConsumerInner实例,PullRequest中会有对应的consumerGroup信息
- 最终调用DefaultMQPushConsumerImpl.pullMessage()方法,完成消息的拉取,具体过程在下面详细分析
- 启动RebalanceService,开始执行rebalance,RebalanceService是一个循环执行的线程,限时等待被唤醒
- 随后调用mqClientFactory.doRebalance(),在MQClientInstance中,会遍历所有的消费组,执行Rebalance操作
- 最终调用到RebalanceImpl.doRebalance()方法,具体过程在下面详细分析
- 初始化内置的Producer
- 设置Consumer状态为RUNNING,刚启动时Consumer状态是START_FAILED
- 从NameServer获取订阅的Topic路由信息并更新本地缓存
- 从路由信息中随机找一个Broker,检查Broker是否支持消费者端对该Topic的订阅方式
- 向所有的Broker发送心跳,因为需要和所有的Broker通信
- 唤醒RebalanceService服务,立即进行一次Rebalance
Consumer的Rebalance流程
Consumer的rebalance流程是通过RebalanceService线程来执行的,当RebalanceService线程被唤醒后,执行以下逻辑
Consumer的rebalance会调用RebalanceImpl.doRebalance()方法,该方法会对订阅的所有topic依此进行rebalanceByTopic()
public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }如果当前topic是广播消费模式,则消费者可以消费所有有效的Message Queue
如果当前topic是集群消费模式,则按照分配策略分配queue
获取该Topic对应的所有mqSet
根据consume group获取订阅了该topic的消费者信息(随机选择一个Broker发送网络请求获取)
按照queue的分配策略(默认为平均分配策略,AllocateMessageQueueAveragely)取得queue的分配结果,目前支持以下分配策略
AllocateMessageQueueAveragely:平均分配,将该topic下所有Queue按照brokerId和queueId从小到大排序,按照Consumer数量平均分为若干份,每个Consumer分配一份。 AllocateMessageQueueAveragelyByCircle:环形平均分配,和平均分配策略类似,只是每个Consumer分配到的queueId不是连续的。 AllocateMessageQueueByConfig:通过配置分配,用户启动时可以指定消费哪些Queue AllocateMessageQueueConsistentHash:一致性哈希,使用一致性hash算法来分配Queue,用户需自定义虚拟节点的数量。 AllocateMessageQueueByMachineRoom:按照机房分配,将queue按照Broker划分为几个machine room,不同的Consumer只消费某几个特定Broker上的消息。 AllocateMachineRoomNearby:机房就近分配,与按照机房分配类似,只是在AllocateMessageQueueByMachineRoom的基础上,会优先为Consumer分配在同一个machine room的Broker上的queue。
根据分配的mqSet更新当前要处理的mq,持久化消费进度并移除不能再继续处理的mq
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { try { if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) { try { return this.unlockDelay(mq, pq); } finally { pq.getConsumeLock().unlock(); } } else { log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", mq, pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); } } catch (Exception e) { log.error("removeUnnecessaryMessageQueue Exception", e); } return false; } return true; }计算分配的mq接下来的消费偏移量并封装PullRequest
将PullRequest放到pullRequestQueue队列中,触发消息拉取流程
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
Consumer的消息拉取流程
Consumer的消息拉取流程是由PullMessageService线程执行的,当从pullRequestQueue中获取到PullRequest之后,执行以下逻辑
- 当本地堆积的消息过多或无序消息的offset跨度过大时,延时放回pullRequestQueue队列中,在未来的一次pullMessage中处理
- 本地堆积消息pullThresholdForQueue默认为1000条,超过该值时会延迟50ms再拉取消息
- 本地堆积消息pullThresholdSizeForQueue默认为100M,超过该值时会延迟50ms再拉取消息
- 如果是并发消费,当消息offset跨度超过consumeConcurrentlyMaxSpan(默认为2000)时,会延迟50ms再拉取消息
- 判断订阅信息是否发生变化,若是则将PullRequest延时放回pullRequestQueue队列中,延迟3s再拉取消息
- 创建PullCallback,作为pull消息的回调处理,onSuccess方法处理拉取消息成功,onException处理拉取消息异常
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
- 如果是集群模式,则从内存中获取commitOffsetValue
- 调用pullAPIWrapper的pullKernelImpl方法拉取消息
- 获取mq对应的broker信息,本地获取不到时会尝试从NameServer拉取路由信息
- 构建PullMessageRequestHeader并设置一系列参数
- 调用MQClientAPIImpl的pullMessage方法拉取消息
- 调用pullMessageAsync
- 调用RemotingClient的invokeAsync方法,使用Netty与Broker通信,实现异步回调
- 服务器响应后,根据结果回调pullCallback的onSuccess/onException方法
- 到这里,消息已经拉取回来了,接下来会执行pullCallback对应的回调方法
- 如果消息拉取失败,则调用onException方法,将PullRequest延时放回pullRequestQueue队列中,延时3秒再次处理
- 如果消息拉取成功且没有获取到消息,则立即将PullRequest放回pullRequestQueue队列中
- 如果消息拉取成功且获取到了消息
- 调用processQueue.putMessage,将获取到的MessageExt消息集合放到ProcessQueue中
- 调用consumeMessageService.submitConsumeRequest,向ConsumeMessageService中提交ConsumeRequest
- 创建PullRequest放到pullRequestQueue队列中,立即准备下一次的消息拉取
- 到这里,我们知道拉取回来的消息已经存储到了ProcessQueue中,ConsumeRequest实际上是一个Runnable的任务,最终会由线程池去执行
- 在ConsumeRequest的run方法中,最终会调用用户自定义MessageListener的consumeMessage方法处理消息并根据消费结果执行对应的逻辑
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
如果消费成功,则调用consumeRequest.getProcessQueue().removeMessage方法从ProcessQueue中移除已经消费的MessageExt
如果消费失败
- 尝试sendMessageBack,将消息重新发送给Broker
- sendMessageBack出现异常时,采用默认的Producer将消息发送给Broker
- 内置的producer会给%RETRY%topic这个topic发送消息
- 所有的Consumer都会订阅%RETRY%topic这个topic,相当于自己给自己发送了一条消息
- 无法投递给Broker的消息,会向ConsumeMessageService中提交延时ConsumeRequest,5s后再次执行消费
调用defaultMQPushConsumerImpl.getOffsetStore().updateOffset方法更新本地offset,此时只会把offset更新到内存中
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); } if (null != offsetOld) { if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { offsetOld.set(offset); } } } }
Consumer的Offset管理
Consumer的偏移量由OffsetStore进行管理,OffsetStore有两个实现:
- LocalFileOffsetStore,在消费者端维护消费者的偏移量,会持久化到本地文件中,适用于广播消费模式的消费者
- RemoteBrokerOffsetStore,在Broker端维护消费者的偏移量,会持久化消费进度到Broker,适用于集群消费模式的消费者
消费进度是由Consumer的定时任务出发,每5秒执行一次persistAllConsumerOffset
调用persistAll对当前消费者分配的MessageQueue持久化
如果是广播消费模式
- 将MessageQueue对应的偏移量转为json字符串
- 保存消费进度到本地文件
如果是集群消费模式
- 从本地内存获取每个MessageQueue对应的偏移量
- 更新消费进度到Broker
- 获取mq对应的broker信息,本地获取不到时会尝试从NameServer拉取路由信息
- 通过RemotingClient发送UPDATE_CONSUMER_OFFSET请求给对应Broker
- 如果当前消费组订阅的MQ发生了变化,从OffsetStore中移除
public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return; final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); AtomicLong offset = entry.getValue(); if (offset != null) { if (mqs.contains(mq)) { try { this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } else { unusedMQ.add(mq); } } } if (!unusedMQ.isEmpty()) { for (MessageQueue mq : unusedMQ) { this.offsetTable.remove(mq); log.info("remove unused mq, {}, {}", mq, this.groupName); } } }