RocketMq源码解读-producer启动

producer启动

1.ExtProducerResetConfiguration

注册额外的producer
ExtProducerResetConfiguration

在这里插入图片描述
注册@ExtRocketMQTemplateConfiguration修饰的bean(必许继承RocketMQTemplate)

2.RocketMQAutoConfiguration

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

1.读取配置创建producer bean
2.创建rocketMQTemplate 注入 producer

3.核心方法 createDefaultMQProducer

二者都会通过 createDefaultMQProducer 创建producer实例
在这里插入图片描述
1.producer实现类均为TransactionMQProducer
2.如果开启了isEnableAcl则注册AclClientRPCHook,在doBeforeRequest中进行acl校验
3.如果开启了isEnableMsgTrace则创建AsyncTraceDispatcher,内部创建了一个线程池
注册sendMessageHook,在发送消息前后构建traceBean,加入到TraceDispatcher中的traceContextQueue中等待被消费

4.填充其他信息

RocketMQAutoConfiguration
在这里插入图片描述
ExtProducerResetConfiguration
在这里插入图片描述
区别:
1.RocketMQAutoConfiguration中可以设置accessChannel
LOCAL表示本地部署的服务
CLOUD表示远程服务(阿里云)

2.ExtProducerResetConfiguration可通过注解覆盖配置文件中的信息

至此,producer实例创建完毕。

5.producer.start()

5.1 实例启动

 public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
        //1
            case CREATE_JUST:
            //2
                this.serviceState = ServiceState.START_FAILED;
                
                //3
                this.checkConfig();
				
				//4
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
					
				
				//5
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

				//6
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

               //7
               this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
               
				//8
                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
  1. 初始状态为CREATE_JUST
  2. 进入后状态立刻修改为START_FAILED 防止重复执行
  3. 检查生产组名是否合法
  4. 设置实例名
  5. 创建mq客户端
  6. 将producer实例注册进客户端的producerTable中

    public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }

        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }

        return true;
    }

  1. 生产者启动创主题表,默认的 topic = TBW102 (用于可自动创建topic的broker)
  2. 启动服务
public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
  • this.mQClientAPIImpl.fetchNameServerAddr();
    获取nameserver地址,如果没有配置,则尝试通过wsAddr获取
  • this.mQClientAPIImpl.start();
    通过NettyRemotingClient创建netty客户端
  • this.startScheduledTask();
    1)如果没有配置确切的namerServer地址,则每两分钟尝试获取一次
    2)默认每30秒向nameserver更新路由表,同时也会更新broker地址表
    3)清理路由表中不存在的broker,向broker发送心跳,上传过滤类(consumer) 周期默认30秒
    4)持久化消费offset(consumer) 周期默认5秒
    5)调整线程池(consumer) 周期默认1分钟
	
private void startScheduledTask() {
		//1
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

		//2 
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

		//3
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

5.2 traceDispatcher启动

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
        if (isStarted.compareAndSet(false, true)) {
            traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();
        }
        this.accessChannel = accessChannel;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();
    }

版权声明:本文为ymybxx原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。