一:前言
上周末写了两篇文章讲到服务端Broker在收到消息后是如何存储消息的:
《RocketMQ源码分析之消息存储》
《RocketMQ源码分析之消息刷盘》
但是除了负责存储消息之外,Broker还要负责创建消费队列。关于消费队列,其实在讲消息发送的时候《RocketMQ源码分析之消息发送》,我就画过一张简单的图。

每个 Topic 在Broker 端都会有多个消费队列,Producer每次都会选择一个ComsumeQueue发送消息,Consumer同样也会每次都选择一个ComsumeQueue拉取消息进行消费。
那么这些ConsumeQueue究竟是什么?有什么作用?又是何时创建的呢?本篇文章我们就一起来分析下吧。
二:ConsumeQueue介绍
每个ConsumeQueue都有一个id,id 的值为0到TopicConfig配置的队列数量。比如某个Topic的消费队列数量为4,那么四个ConsumeQueue的id就分别为0、1、2、3。
ConsumeQueue是不负责存储消息的,只是负责记录它所属Topic的消息在CommitLog中的偏移量,这样当消费者从Broker拉取消息的时候,就可以快速根据偏移量定位到消息。
ConsumeQueue本身同样是利用MappedFileQueue进行记录偏移量信息的,可见MappedFileQueue的设计多么美妙,它没有与消息进行耦合,而是设计成一个通用的存储功能。
ConsumeQueue更新消息偏移量的整体过程大概如下图所示,其中涉及了几个概念。
- ReputMessageService
- CommitLogDispatcherBuildConsumeQueue

上面这个图比较粗糙,画的很随意,具体的源码分析还请大家接着往下看。
三:ReputMessageService服务
之前在讲到消息存储的时候,提到每个Broker在初始化的时候都会初始化一个MessageStore负责存储消息,而MessageStore在初始化的时候,同样会启动一个ReputMessageService,ReputMessageService就是用来更新ConsumeQueue中消息偏移的。
ReputMessageService本身是一个线程,它启动后便会在循环中不断调用doReput()方法,用来通知ConsumeQueue进行更新。
@Override
public void run() {
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
doReput()中主要分为以下几步:
1:获取CommitLog中存储的新消息。
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
reputFromOffset记录了本次需要拉取的消息在CommitLog中的偏移。这里将reputFromOffset传递给CommitLog,获取CommitLog在reputFromOffset处存储的消息。
2:如果第一步获取的消息不为空,则表明有新消息被存储到CommitLog中,此时便会通知ConsumeQueue更新消息偏移。
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
......
DefaultMessageStore.this.doDispatch(dispatchRequest);
3:更新reputFromOffset,设置为下次需要拉取的消息在CommitLog中的偏移。
this.reputFromOffset = result.getStartOffset();
......
int size = dispatchRequest.getMsgSize();
......
this.reputFromOffset += size;
上面的重点在第二步中,这里调用 DefaultMessageStore.this.doDispatch(dispatchRequest) 来通知ConsumeQueue。
DefaultMessageStore中存储了一个dispatcherList,其中存放了几个CommitLogDispatcher对象,它们都是用来监听CommitLog中新消息存储的。
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
doDispatch()会遍历CommitLogDispatcher,调用它们的dispatch()方法。其中专门用来通知ConsumeQueue的Dispatcher是CommitLogDispatcherBuildConsumeQueue。
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
当ReputMessageService调用了CommitLogDispatcherBuildConsumeQueue的dispatch()后,CommitLogDispatcherBuildConsumeQueue便会调用 DefaultMessageStore.this.putMessagePositionInfo(request):
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
putMessagePositionInfo()逻辑有两步:
1:调用findConsumeQueue(),根据消息的topic以及消息所属的ConsumeQueueId,找到对应的ConsumeQueue。
findConsumeQueue()会先从consumeQueueTable中查询topic的ConsumeQueueMap,如果未找到,便会为Topic创建一个新的ConcurrentMap<Integer/* queueId */, ConsumeQueue>,存放到表中。
接着在从Topic的ConcurrentMap中,根据QueueId,查询ConsumeQueue,如果未找到,便也会创建一个新的ConsumeQueue,存放到Map中。ConsumeQueue便是此时被创建的。
2:当找到消息对应的ConsumeQueue后,便调用ConsumeQueue的putMessagePositionInfoWrapper()方法,更新ConsumeQueue。
四:ConsumeQueue的更新
上面主要讲了ReputMessageService是如何通知ConsumeQueue的,现在我们就要看看ConsumeQueue收到通知后是如何更新的,更新逻辑就在putMessagePositionInfoWrapper()中。
putMessagePositionInfoWrapper()中调用了putMessagePositionInfo(),并引入了重试机制。
我们来看看putMessagePositionInfo()中的主要逻辑:
1:判断消息是否已经被处理过
if (offset <= this.maxPhysicOffset) {
return true;
}
maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量,如果本次消息偏移量小于maxPhysicOffset,则表明消息已经被更新过,直接返回。
2:初始存储偏移量所用的内存
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
这里将消息的偏移量、大小、tagsCode等信息,都暂存到了一块ByteBuffer中。
3:获取此次存储消息偏移量所用的MappedFile
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
前面我们讲到,ConsumeQueue本身也是采用MappedFileQueue用来存储偏移量的,这里便是获取MappedFileQueue中的最后一个MappedFile,用来存储消息。
ConsumeQueue 中用来存储消息偏移量的结构大小为 CQ_STORE_UNIT_SIZE,为20个字节,cqOffse t为 ConsumeQueue 中已经记录了多少条消息的偏移量,所以 expectLogicOffset 即为当前需要存储的消息偏移量结构,在ConsumeQueue的MappedFileQueue中的位置。
4:更新maxPhysicOffset,并将暂存在ByteBuffer中的消息偏移信息,追加到MappedFile中。
this.maxPhysicOffset = offset;
return mappedFile.appendMessage(this.byteBufferIndex.array());
此时,消息偏移量就被成功存储到ConsumeQueue中了。
五:总结
通过上面的分析,我们知道了ConsumeQueue的作用及更新流程,总结来说,它就是用来记录消息在CommitLog中偏移量的,便于Consumer快速定位消息。
消息偏移量的更新分为下面几步:
1:ReputMessageService不断从CommitLog中查询是否有新存储的消息;
2:如果有新消息,便通过Dispatcher通知ConsumeQueue;
3:ConsumeQueue收到通知后会将消息偏移量存储到自身的MappedFile中。
后面文章我还会继续剖析 RocketMQ。欢迎大家点个赞,关注下!
文章链接:
汪先生:RocketMQ源码分析之服务发现
汪先生:RocketMQ源码分析之消息发送
汪先生:RocketMQ源码分析之消息存储
汪先生:RocketMQ源码分析之消息刷盘
汪先生:RocketMQ源码分析之ConsumeQueue
听说喜欢点关注的同学都长得帅