RocketMq源码解读-producer启动
producer启动
1.ExtProducerResetConfiguration
注册额外的producer

注册@ExtRocketMQTemplateConfiguration修饰的bean(必许继承RocketMQTemplate)
2.RocketMQAutoConfiguration



1.读取配置创建producer bean
2.创建rocketMQTemplate 注入 producer
3.核心方法 createDefaultMQProducer
二者都会通过 createDefaultMQProducer 创建producer实例
1.producer实现类均为TransactionMQProducer
2.如果开启了isEnableAcl则注册AclClientRPCHook,在doBeforeRequest中进行acl校验
3.如果开启了isEnableMsgTrace则创建AsyncTraceDispatcher,内部创建了一个线程池
注册sendMessageHook,在发送消息前后构建traceBean,加入到TraceDispatcher中的traceContextQueue中等待被消费
4.填充其他信息
RocketMQAutoConfiguration
ExtProducerResetConfiguration
区别:
1.RocketMQAutoConfiguration中可以设置accessChannel
LOCAL表示本地部署的服务
CLOUD表示远程服务(阿里云)
2.ExtProducerResetConfiguration可通过注解覆盖配置文件中的信息
至此,producer实例创建完毕。
5.producer.start()
5.1 实例启动
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
//1
case CREATE_JUST:
//2
this.serviceState = ServiceState.START_FAILED;
//3
this.checkConfig();
//4
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//5
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//6
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//7
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//8
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
- 初始状态为CREATE_JUST
- 进入后状态立刻修改为START_FAILED 防止重复执行
- 检查生产组名是否合法
- 设置实例名
- 创建mq客户端
- 将producer实例注册进客户端的producerTable中
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
- 生产者启动创主题表,默认的 topic = TBW102 (用于可自动创建topic的broker)
- 启动服务
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
- this.mQClientAPIImpl.fetchNameServerAddr();
获取nameserver地址,如果没有配置,则尝试通过wsAddr获取 - this.mQClientAPIImpl.start();
通过NettyRemotingClient创建netty客户端 - this.startScheduledTask();
1)如果没有配置确切的namerServer地址,则每两分钟尝试获取一次
2)默认每30秒向nameserver更新路由表,同时也会更新broker地址表
3)清理路由表中不存在的broker,向broker发送心跳,上传过滤类(consumer) 周期默认30秒
4)持久化消费offset(consumer) 周期默认5秒
5)调整线程池(consumer) 周期默认1分钟
private void startScheduledTask() {
//1
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
//2
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
//3
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
5.2 traceDispatcher启动
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
版权声明:本文为ymybxx原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。