前言
rocketMq消费模式分为集群消费clustering和广播消费broadcast, 默认是clustering
消费方式:
- PullConsumer,由用户主动调用pull方法来获取消息,没有则返回
- PushConsumer,在启动后,Consumer客户端会主动循环发送Pull请求到broker,如果没有消息,broker会把请求放入等待队列,新消息到达后返回response。
消费模式:
- broadcast 消息会发送给group内所有consumer。
- clustering 每条消息只会发送给group内的一个consumer,集群模式的支持消费失败重发,消息队列数量与消费者关系:1个消费者可以消费多个队列,但1个消息队列只会被一个消费者消费;如果消费者数量大于消息队列数量,则有的消费者会消费不到消息
一、消费类图


二、集群消费
2.1 DefaultMQPushConsumer
先看下四个构造函数
/**
* Default constructor.
*/
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.构造函数指定使用者组、RPC钩子和消息队列分配算法。
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
/**
* Constructor specifying RPC hook.
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
*/
public DefaultMQPushConsumer(final String consumerGroup) {
// 这里采用平均散列队列算法
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
默认使用AllocateMessageQueueAveragely分配queue策略支持六种策略:
- AllocateMessageQueueConsistentHash 一致性哈希队列算法
- AllocateMessageQueueByMachineRoom 机房哈希队列算法
- AllocateMessageQueueByConfig 按配置分配消息队列
- AllocateMessageQueueAveragelyByCircle 循环平均哈希队列算法
- AllocateMessageQueueAveragely 平均散列队列算法
- AllocateMachineRoomNearby 机房近端优先级的分配策略代理
DefaultMQPushConsumer的start方法
@Override
public void start() throws MQClientException {
// 服务启动=》
this.defaultMQPullConsumerImpl.start();
}
DefaultMQPullConsumerImpl的start方法
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//1、基本的参数检查,group name不能是DEFAULT_CONSUMER
this.checkConfig();
//2、将DefaultMQPushConsumer的订阅信息copy到RebalanceService中
this.copySubscription();
//3、如果是cluster模式,修改InstanceName参数值为PID
if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
// //4、新建一个MQClientInstance,客户端管理类,所有的i/o类操作由它管理
// //缓存客户端和topic信息,各种service
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
//Queue分配策略
//对于同一个group内的consumer,RebalanceImpl负责分配具体每个consumer应该消费哪些queue上的消息,以达到负载均衡的目的。
// Rebalance支持多种分配策略,比如平均分配、一致性Hash等(具体参考AllocateMessageQueueStrategy实现类)。默认采用平均分配策略(AVG)。
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//PullRequest封装实现类,封装了和broker的通信接口
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
// 注册过滤消息钩子方法 消息被客户端过滤时会回调hook
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
// 广播消息本地持久化offse
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
// 集群模式持久化到broker
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
}
// offset加载=》如果是本地持久化会从文件中load
this.offsetStore.load();
// 注册消费者=》
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 启动MQClientInstance,会启动PullMessageService和RebalanceService
mQClientFactory.start();
log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
MQClientInstance的start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST://仅创建不启动
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server 如果启动的时候命令行没有指定name server的地址,就去获取
if (null == this.clientConfig.getNamesrvAddr()) {
// 获取namesrv地址
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel 启动请求响应的channel =》
this.mQClientAPIImpl.start();
// Start various schedule tasks 启动调度任务=》
this.startScheduledTask();
// Start pull service 启动pull消息服务=》
this.pullMessageService.start();
// Start rebalance service 启动负载均衡服务=》
this.rebalanceService.start();
// Start push service 启动push服务=》
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
startScheduledTask() 开启几个定时任务
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 更新namesrv地址
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 从namesrv更新broker路由集合=》
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 清楚下线的broker=》
MQClientInstance.this.cleanOfflineBroker();
// 同步向所有的broker发送心跳检测=》
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 持久化消费者的offset=》
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 线程数调整
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
pullMessageService.start() 拉取消息线程
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
private final MQClientInstance mQClientFactory;
// 单例调度线程池
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PullMessageServiceScheduledThread");
}
});
public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
//
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// 立即执行拉取消息请求=》
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
//
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
// 往阻塞队列中存放pull请求
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
//
public void executeTaskLater(final Runnable r, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
//
private void pullMessage(final PullRequest pullRequest) {
// 根据消费组选择消费者=》
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 拉去消息=》
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 服务正常
while (!this.isStopped()) {
try {
// 从队列中获取一个pull请求
PullRequest pullRequest = this.pullRequestQueue.take();
// 拉取消息=》
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
//
@Override
public void shutdown(boolean interrupt) {
super.shutdown(interrupt);
ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
}
@Override
public String getServiceName() {
return PullMessageService.class.getSimpleName();
}
}
DefaultMQPushConsumerImpl pullMessage
public void pullMessage(final PullRequest pullRequest) {
// 从pull请求中获取处理队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 是否删除
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
// 处理队列设置最后拉去的时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
// 确认服务是否ok=》
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
// 延迟3秒执行pull请求=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
// 服务暂停
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
// 延迟1s拉取消息=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
// 缓存消息的数量
long cachedMessageCount = processQueue.getMsgCount().get();
// 缓存消息的大小M
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 缓存的消息大于pull消息的阈值1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
// 50ms后拉取=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 缓存消息队列的大小大于阈值100m
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
// 延迟50ms后拉取消息=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 顺序消费
if (!this.consumeOrderly) {
// 处理队列最大数量大于消费并发最大数2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
// 50ms后拉取消息=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
// 不是顺序消费
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
// 计算从哪里拉取消息=》
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
// 3s后拉取消息=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
// 获取topic订阅数据
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
// 3s后拉取消息=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
// 拉去信息回调
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 处理拉取消息请求=》
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
// 找到消息修改拉取的offset
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
// 记录topic、消费组拉取消息次数
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 没找到消息立即拉取消息=》
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
// 获取查询offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
// 记录topic、消费组拉取消息次数
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交消费请求=》
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 拉取消息频次大于0
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
// 延迟拉取消息=》
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
// 立即拉取消息=》
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 修改tag offset=》
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 立即执行消息拉取=》
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 修改 tag offset=》
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 立即消息拉取=》
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// offset无效,删除处理队列
pullRequest.getProcessQueue().setDropped(true);
// 延迟执行任务
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
// 更新消息队列的下个offset
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
// 持久化消息队列=》
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
// 删除消息队列=》
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
// 不是重试topic
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
// 3s后执行拉取消息=》
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 集群消费
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 从内存中读取offset=》
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
// 获取topic的订阅数据
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 拉取消息=》
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
// 3s后执行拉取消息=》
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
public void doRebalance() {
// 获取所有的消费者信息
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
// =》
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
RebalanceService start 负载消费队列线程
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
// 负载均衡处理 =》
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
public void doRebalance() {
// 获取所有的消费者信息
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
// =》
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
public void doRebalance(final boolean isOrder) {
// 获取topic的订阅信息=》
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 根据topic进行负载均衡处理=》
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
// 根据topic删除消息队列=》
this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// 获取topic的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 更新负载均衡处理队列表 =》
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
// 消息队列发生改变=》
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 从topic订阅信息中获取消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 集群消费的方式,按topic、消费组获取消费者=》
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 消费队列分配策略算法=》
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新处理队列=》
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
// 消息队列更新=》
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
- 可以看出集群消费者会把topic的所有queue获取到, 和这个消费组里面所有的消费者获取到 int mod = queue.size % consumer.size(); (int mod = mqAll.size() % cidAll.size(); 做负载均衡,一个消费者可以获取到多个queue
三、广播消费
总结
版权声明:本文为qq_32655207原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。