RocketMQ的Consumer源码分析

RocketMQ的Consumer源码分析


这一节我们分析RocketMQ的Consumer的启动流程以及消息处理过程。下图是RocketMQ启动流程、Rebalance、以及消息拉取处理的较为完整的流程图。
在这里插入图片描述
##Consumer的启动流程

Consumer的启动主要会经过以下步骤:

  1. 校验Consumer一系列的配置,包括消费组、消费者配置等信息
    • 校验消费组是否非空、是否超长、是否含有非法字符、是否系统占用等
    • 校验一系列配置信息是否设置,如消费模式、消费位置、消息监听器等
    • 校验消费者线程大小、消费者拉取消息的大小、批次大小等
  2. 复制订阅信息到RebalanceImpl,copySubscription()会把当前Topic的订阅信息放到RebalanceImpl的map中
  3. 如果是集群消费模式,使用PID设置实例名称
  4. 创建MQClientInstance,会优先从缓存中获取,如果缓存中有则直接返回,这也意味着相同的clientId会共享MQClientInstance
  5. 为RebalanceImpl设置所需要的属性,如消费组、消费模式、Queue分配策略等
  6. 初始化一个PullAPIWrapper,这个PullAPIWrapper是会为后面从Broker拉取消息服务
  7. 执行offsetStore.load(),广播模式下会选择从本地加载消费进度,集群模式默认什么也不做
  8. 根据顺序/并发消费方式初始化对应的消息消费服务
  9. 启动消息消费服务,这里会开启一个延时任务定期执行
    • 对于并发消费方式,会定期清理过期消息,默认每15分钟执行一次
    • 对于顺序消费方式且为集群消费模式时,会定期将消费的Queue锁定,默认每20秒执行一次
  10. 向MQClientInstance注册Consumer,相同的consumerGroup共享DefaultMQPushConsumer
  11. 启动MQClientInstance,这一步在初始化过程是Consumer启动的关键
    • 如果没有指定NameServer地址,则会尝试从配置中心拉取,我们通常会在启动Producer时指定,没有指定的场景通常用在多集群的业务中,方便统一进行管理
    • 启动MQClientAPIImpl,这里会完成NettyClient的初始化,为网络通信做准备
    • 开启一系列的定时任务
      • 如果没有指定NameServer地址,则定期从配置中心拉取
      • 定期更新Topic的路由信息,这里会随机选择一个NameServer进行通信,获取topic对应的路由信息并更新到本地的内存中
      • 定期清理下线的Broker,并且给所有的Broker发送心跳,默认每30秒执行一次
      • 定期持久化消费进度,默认每5秒执行一次
      • 定期调整消费者线程大小
    • 启动PullMessageService,开始处理PullRequest,PullMessageService是一个循环执行的线程
      • 从pullRequestQueue队列中获取一个PullRequest,如果没有获取到则会阻塞直到从队列中获取到数据为止
      • 根据消费组获取到对应的MQConsumerInner实例,PullRequest中会有对应的consumerGroup信息
      • 最终调用DefaultMQPushConsumerImpl.pullMessage()方法,完成消息的拉取,具体过程在下面详细分析
    • 启动RebalanceService,开始执行rebalance,RebalanceService是一个循环执行的线程,限时等待被唤醒
      • 随后调用mqClientFactory.doRebalance(),在MQClientInstance中,会遍历所有的消费组,执行Rebalance操作
      • 最终调用到RebalanceImpl.doRebalance()方法,具体过程在下面详细分析
    • 初始化内置的Producer
  12. 设置Consumer状态为RUNNING,刚启动时Consumer状态是START_FAILED
  13. 从NameServer获取订阅的Topic路由信息并更新本地缓存
  14. 从路由信息中随机找一个Broker,检查Broker是否支持消费者端对该Topic的订阅方式
  15. 向所有的Broker发送心跳,因为需要和所有的Broker通信
  16. 唤醒RebalanceService服务,立即进行一次Rebalance

Consumer的Rebalance流程

Consumer的rebalance流程是通过RebalanceService线程来执行的,当RebalanceService线程被唤醒后,执行以下逻辑

  1. Consumer的rebalance会调用RebalanceImpl.doRebalance()方法,该方法会对订阅的所有topic依此进行rebalanceByTopic()

    public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
    
        this.truncateMessageQueueNotMyTopic();
    }
    
  2. 如果当前topic是广播消费模式,则消费者可以消费所有有效的Message Queue

  3. 如果当前topic是集群消费模式,则按照分配策略分配queue

    • 获取该Topic对应的所有mqSet

    • 根据consume group获取订阅了该topic的消费者信息(随机选择一个Broker发送网络请求获取)

    • 按照queue的分配策略(默认为平均分配策略,AllocateMessageQueueAveragely)取得queue的分配结果,目前支持以下分配策略

      AllocateMessageQueueAveragely:平均分配,将该topic下所有Queue按照brokerId和queueId从小到大排序,按照Consumer数量平均分为若干份,每个Consumer分配一份。
      AllocateMessageQueueAveragelyByCircle:环形平均分配,和平均分配策略类似,只是每个Consumer分配到的queueId不是连续的。
      AllocateMessageQueueByConfig:通过配置分配,用户启动时可以指定消费哪些Queue
      AllocateMessageQueueConsistentHash:一致性哈希,使用一致性hash算法来分配Queue,用户需自定义虚拟节点的数量。
      AllocateMessageQueueByMachineRoom:按照机房分配,将queue按照Broker划分为几个machine room,不同的Consumer只消费某几个特定Broker上的消息。
      AllocateMachineRoomNearby:机房就近分配,与按照机房分配类似,只是在AllocateMessageQueueByMachineRoom的基础上,会优先为Consumer分配在同一个machine room的Broker上的queue。
      
  4. 根据分配的mqSet更新当前要处理的mq,持久化消费进度并移除不能再继续处理的mq

    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
        if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
            && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
            try {
                if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
                    try {
                        return this.unlockDelay(mq, pq);
                    } finally {
                        pq.getConsumeLock().unlock();
                    }
                } else {
                    log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                        mq,
                        pq.getTryUnlockTimes());
    
                    pq.incTryUnlockTimes();
                }
            } catch (Exception e) {
                log.error("removeUnnecessaryMessageQueue Exception", e);
            }
    
            return false;
        }
        return true;
    }
    
  5. 计算分配的mq接下来的消费偏移量并封装PullRequest

  6. 将PullRequest放到pullRequestQueue队列中,触发消息拉取流程

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}

public void executePullRequestImmediately(final PullRequest pullRequest) {
		try {
				this.pullRequestQueue.put(pullRequest);
		} catch (InterruptedException e) {
				log.error("executePullRequestImmediately pullRequestQueue.put", e);
		}
}

Consumer的消息拉取流程

Consumer的消息拉取流程是由PullMessageService线程执行的,当从pullRequestQueue中获取到PullRequest之后,执行以下逻辑

  1. 当本地堆积的消息过多或无序消息的offset跨度过大时,延时放回pullRequestQueue队列中,在未来的一次pullMessage中处理
    • 本地堆积消息pullThresholdForQueue默认为1000条,超过该值时会延迟50ms再拉取消息
    • 本地堆积消息pullThresholdSizeForQueue默认为100M,超过该值时会延迟50ms再拉取消息
    • 如果是并发消费,当消息offset跨度超过consumeConcurrentlyMaxSpan(默认为2000)时,会延迟50ms再拉取消息
  2. 判断订阅信息是否发生变化,若是则将PullRequest延时放回pullRequestQueue队列中,延迟3s再拉取消息
  3. 创建PullCallback,作为pull消息的回调处理,onSuccess方法处理拉取消息成功,onException处理拉取消息异常
PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);

                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }

                    if (pullResult.getNextBeginOffset() < prevRequestOffset
                        || firstMsgOffset < prevRequestOffset) {
                        log.warn(
                            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                            pullResult.getNextBeginOffset(),
                            firstMsgOffset,
                            prevRequestOffset);
                    }

                    break;
                case NO_NEW_MSG:
                case NO_MATCHED_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case OFFSET_ILLEGAL:
                    log.warn("the pull request offset illegal, {} {}",
                        pullRequest.toString(), pullResult.toString());
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    pullRequest.getProcessQueue().setDropped(true);
                    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                    pullRequest.getNextOffset(), false);

                                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                log.warn("fix the pull request offset, {}", pullRequest);
                            } catch (Throwable e) {
                                log.error("executeTaskLater Exception", e);
                            }
                        }
                    }, 10000);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            log.warn("execute the pull request exception", e);
        }

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    }
};
  1. 如果是集群模式,则从内存中获取commitOffsetValue
  2. 调用pullAPIWrapper的pullKernelImpl方法拉取消息
    • 获取mq对应的broker信息,本地获取不到时会尝试从NameServer拉取路由信息
    • 构建PullMessageRequestHeader并设置一系列参数
    • 调用MQClientAPIImpl的pullMessage方法拉取消息
      • 调用pullMessageAsync
      • 调用RemotingClient的invokeAsync方法,使用Netty与Broker通信,实现异步回调
      • 服务器响应后,根据结果回调pullCallback的onSuccess/onException方法
  3. 到这里,消息已经拉取回来了,接下来会执行pullCallback对应的回调方法
  4. 如果消息拉取失败,则调用onException方法,将PullRequest延时放回pullRequestQueue队列中,延时3秒再次处理
  5. 如果消息拉取成功且没有获取到消息,则立即将PullRequest放回pullRequestQueue队列中
  6. 如果消息拉取成功且获取到了消息
    • 调用processQueue.putMessage,将获取到的MessageExt消息集合放到ProcessQueue中
    • 调用consumeMessageService.submitConsumeRequest,向ConsumeMessageService中提交ConsumeRequest
    • 创建PullRequest放到pullRequestQueue队列中,立即准备下一次的消息拉取
  7. 到这里,我们知道拉取回来的消息已经存储到了ProcessQueue中,ConsumeRequest实际上是一个Runnable的任务,最终会由线程池去执行
  8. 在ConsumeRequest的run方法中,最终会调用用户自定义MessageListener的consumeMessage方法处理消息并根据消费结果执行对应的逻辑
public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}
  1. 如果消费成功,则调用consumeRequest.getProcessQueue().removeMessage方法从ProcessQueue中移除已经消费的MessageExt

  2. 如果消费失败

    • 尝试sendMessageBack,将消息重新发送给Broker
    • sendMessageBack出现异常时,采用默认的Producer将消息发送给Broker
      • 内置的producer会给%RETRY%topic这个topic发送消息
      • 所有的Consumer都会订阅%RETRY%topic这个topic,相当于自己给自己发送了一条消息
    • 无法投递给Broker的消息,会向ConsumeMessageService中提交延时ConsumeRequest,5s后再次执行消费
  3. 调用defaultMQPushConsumerImpl.getOffsetStore().updateOffset方法更新本地offset,此时只会把offset更新到内存中

    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }
    
            if (null != offsetOld) {
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }
    

Consumer的Offset管理

Consumer的偏移量由OffsetStore进行管理,OffsetStore有两个实现:

  • LocalFileOffsetStore,在消费者端维护消费者的偏移量,会持久化到本地文件中,适用于广播消费模式的消费者
  • RemoteBrokerOffsetStore,在Broker端维护消费者的偏移量,会持久化消费进度到Broker,适用于集群消费模式的消费者
  1. 消费进度是由Consumer的定时任务出发,每5秒执行一次persistAllConsumerOffset

  2. 调用persistAll对当前消费者分配的MessageQueue持久化

  3. 如果是广播消费模式

    • 将MessageQueue对应的偏移量转为json字符串
    • 保存消费进度到本地文件
  4. 如果是集群消费模式

    • 从本地内存获取每个MessageQueue对应的偏移量
    • 更新消费进度到Broker
      • 获取mq对应的broker信息,本地获取不到时会尝试从NameServer拉取路由信息
      • 通过RemotingClient发送UPDATE_CONSUMER_OFFSET请求给对应Broker
    • 如果当前消费组订阅的MQ发生了变化,从OffsetStore中移除
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;
    
        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
    
        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            AtomicLong offset = entry.getValue();
            if (offset != null) {
                if (mqs.contains(mq)) {
                    try {
                        this.updateConsumeOffsetToBroker(mq, offset.get());
                        log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                            this.groupName,
                            this.mQClientFactory.getClientId(),
                            mq,
                            offset.get());
                    } catch (Exception e) {
                        log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                    }
                } else {
                    unusedMQ.add(mq);
                }
            }
        }
    
        if (!unusedMQ.isEmpty()) {
            for (MessageQueue mq : unusedMQ) {
                this.offsetTable.remove(mq);
                log.info("remove unused mq, {}, {}", mq, this.groupName);
            }
        }
    }
    

版权声明:本文为foolishboy_w原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。