Nacos源码解析(三)-NacosConfigService创建分析

第一部分 Nacos Config自动装配

@Configuration(proxyBeanMethods = false)

@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)

public class NacosConfigAutoConfiguration {


   @Bean

   public NacosConfigProperties nacosConfigProperties(ApplicationContext context) {

      if (context.getParent() != null

            && BeanFactoryUtils.beanNamesForTypeIncludingAncestors(

                  context.getParent(), NacosConfigProperties.class).length > 0) {

         return BeanFactoryUtils.beanOfTypeIncludingAncestors(context.getParent(),

               NacosConfigProperties.class);

      }

      return new NacosConfigProperties();

   }


   @Bean

   public NacosRefreshProperties nacosRefreshProperties() {

      return new NacosRefreshProperties();

   }


   @Bean

   public NacosRefreshHistory nacosRefreshHistory() {

      return new NacosRefreshHistory();

   }


   @Bean

   public NacosConfigManager nacosConfigManager(

         NacosConfigProperties nacosConfigProperties) {

      return new NacosConfigManager(nacosConfigProperties);

   }


   @Bean

   public NacosContextRefresher nacosContextRefresher(

         NacosConfigManager nacosConfigManager,

         NacosRefreshHistory nacosRefreshHistory) {

      // Consider that it is not necessary to be compatible with the previous

      // configuration

      // and use the new configuration if necessary.

      return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);

   }

}

NacosConfigService创建了五个Bean

NacosConfigProperties  配置属性类

NacosRefreshProperties 刷新属性类

NacosRefreshHistory  刷新历史类

NacosConfigManager 配置管理器类

NacosContextRefresher Nacos上下文刷新器

主要看一下NacosConfigManager

public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {

   this.nacosConfigProperties = nacosConfigProperties;

   // Compatible with older code in NacosConfigProperties,It will be deleted in the

   // future.

   createConfigService(nacosConfigProperties);

}



/**

* Compatible with old design,It will be perfected in the future.

*/

static ConfigService createConfigService(

      NacosConfigProperties nacosConfigProperties) {

   if (Objects.isNull(service)) {

      synchronized (NacosConfigManager.class) {

         try {

            if (Objects.isNull(service)) {

               service = NacosFactory.createConfigService(

                     nacosConfigProperties.assembleConfigServiceProperties());

            }

         }

         catch (NacosException e) {

            log.error(e.getMessage());

            throw new NacosConnectionFailureException(

                  nacosConfigProperties.getServerAddr(), e.getMessage(), e);

         }

      }

   }

   return service;

}

主要是判断是否存在NacosConfigService如果不存在,则通过NacosFactory创建

参照NacosNamingService,NacosConfigService也是通过反射创建。

接下来看看NacosConfigService的创建流程、关键类及其作用

第二部分 NacosConfigService创建

NacosConfigService构造方法

public NacosConfigService(Properties properties) throws NacosException {

    //验证参数

    ValidatorUtils.checkInitParam(properties);

    //初始化namespace

    initNamespace(properties);

    //配置过滤链管理器

    this.configFilterChainManager = new ConfigFilterChainManager(properties);

    //创建服务列表管理器并启动 参考NacosNamingService

    ServerListManager serverListManager = new ServerListManager(properties);

    serverListManager.start();

    

        //创建客户端worker

    this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);

    // will be deleted in 2.0 later versions

    //服务http代理

    agent = new ServerHttpAgent(serverListManager);

}

主要创建了几个类:

ConfigFilterChainManager

ServerListManager

ClientWork

ServerHttpAgent

接下来分析各个类的作用:

1.ConfigFilterChainManager  构造方法

public ConfigFilterChainManager(Properties properties) {

    ServiceLoader<IConfigFilter> configFilters = ServiceLoader.load(IConfigFilter.class);

    for (IConfigFilter configFilter : configFilters) {

        configFilter.init(properties);

        addFilter(configFilter);

    }

}

首先,用ServiceLoader加载了IConfigFilter.class,找到META-INF.service目录找到IConfigFilter全路径名称的文件,文件指定了类:

com.alibaba.nacos.client.config.filter.impl.ConfigEncryptionFilter

然后遍历所加载的configFilter 进行初始化init,并添加filter

ConfigEncryptionFilter 的init方法为空,而addFilter方法主要是将filter添加到filters集合中

,ConfigFilterChainManager是通过VirtualFilterChain的doFilter方法,循环filters调用doFilter方法

具体看一下ConfigEncryptionFilter的doFilter方法

public void doFilter(IConfigRequest request, IConfigResponse response, IConfigFilterChain filterChain)

        throws NacosException {

    if (Objects.nonNull(request) && request instanceof ConfigRequest && Objects.isNull(response)) {

        //加密过程

        // Publish configuration, encrypt

        ConfigRequest configRequest = (ConfigRequest) request;

        String dataId = configRequest.getDataId();

        String content = configRequest.getContent();

        

        Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);

        String secretKey = pair.getFirst();

        String encryptContent = pair.getSecond();

        

        ((ConfigRequest) request).setContent(encryptContent);

        ((ConfigRequest) request).setEncryptedDataKey(secretKey);

    }

    if (Objects.nonNull(response) && response instanceof ConfigResponse && Objects.isNull(request)) {

        //解密过程

        // Get configuration, decrypt

        ConfigResponse configResponse = (ConfigResponse) response;

        

        String dataId = configResponse.getDataId();

        String encryptedDataKey = configResponse.getEncryptedDataKey();

        String content = configResponse.getContent();

        

        Pair<String, String> pair = EncryptionHandler.decryptHandler(dataId, encryptedDataKey, content);

        String decryptContent = pair.getSecond();

        ((ConfigResponse) response).setContent(decryptContent);

    }

    filterChain.doFilter(request, response);

}

EncryptionHandler中加密解密中会先做验证,checkCipher(dataId),判断dataId即配置名称是否以 cipher- 开头,如果不是以cipher-开头,直接返回明文

EncryptionHandler最终是通过EncryptionPluginService来完成加解密的、所以需要自己实现EncryptionPluginService

2.ServerListManager在NacosNamingService中已经分析过,主要是定时任务从nacos server获取服务的列表并更新

3.ClientWork

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,

        final Properties properties) throws NacosException {

    this.configFilterChainManager = configFilterChainManager;

    
    init(properties);

    
    agent = new ConfigRpcTransportClient(properties, serverListManager);

    int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);

    ScheduledExecutorService executorService = Executors

            .newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {

                Thread t = new Thread(r);

                t.setName("com.alibaba.nacos.client.Worker");

                t.setDaemon(true);

                return t;

            });

    agent.setExecutor(executorService);

    agent.start();

}

ClientWork构造方法主要创建ConfigRpcTransportClient并设置线程池并启动

ConfigRpgTransportClient中start方法

public void start() throws NacosException {

    securityProxy.login(this.properties);

    this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,

            this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);

    startInternal();

}

进入startInternal方法

public void startInternal() {

    executor.schedule(() -> {

        while (!executor.isShutdown() && !executor.isTerminated()) {

            try {

                listenExecutebell.poll(5L, TimeUnit.SECONDS);

                if (executor.isShutdown() || executor.isTerminated()) {

                    continue;

                }

                executeConfigListen();

            } catch (Exception e) {

                LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);

            }

        }

    }, 0L, TimeUnit.MILLISECONDS);
    
}

主要是执行executeConfigListen方法

public void executeConfigListen() {

    Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);

    Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);

    long now = System.currentTimeMillis();

    boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;

    //循环1开始:

    for (CacheData cache : cacheMap.get().values()) {

        synchronized (cache) {

            //1.验证本地监听器是否一致

            //check local listeners consistent.

            if (cache.isSyncWithServer()) {

                //验证监听器MD5是否一致,如果则不一致则需要调用CacheData的safeNotifyListener方法,用最新的MD5加密并接收ConfigChange

                //TODO saftNotifyListener稍后待详细分析

                cache.checkListenerMd5();

                if (!needAllSync) {

                    continue;

                }

            }
            

            if (!CollectionUtils.isEmpty(cache.getListeners())) {

                //get listen  config

                if (!cache.isUseLocalConfigInfo()) {

                    List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));

                    if (cacheDatas == null) {

                        cacheDatas = new LinkedList<>();

                        listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);

                    }

                    cacheDatas.add(cache);
                    

                }

            } else if (CollectionUtils.isEmpty(cache.getListeners())) {
                

                if (!cache.isUseLocalConfigInfo()) {

                    List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));

                    if (cacheDatas == null) {

                        cacheDatas = new LinkedList<>();

                        removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);

                    }

                    cacheDatas.add(cache);
                    

                }

            }

        }

        

    }

    //循环1结束


    //循环2开始:

    boolean hasChangedKeys = false;
    

    if (!listenCachesMap.isEmpty()) {

        for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {

            String taskId = entry.getKey();

            Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);

            

            List<CacheData> listenCaches = entry.getValue();

            for (CacheData cacheData : listenCaches) {

                timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),

                        cacheData.getLastModifiedTs().longValue());

            }
           

            ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);

            configChangeListenRequest.setListen(true);

            try {

                RpcClient rpcClient = ensureRpcClient(taskId);

                ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(

                        rpcClient, configChangeListenRequest);

                if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
                    

                    Set<String> changeKeys = new HashSet<String>();

                    //handle changed keys,notify listener

                    if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {

                        hasChangedKeys = true;

                        for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse

                                .getChangedConfigs()) {

                            String changeKey = GroupKey

                                    .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),

                                            changeConfig.getTenant());

                            changeKeys.add(changeKey);

                            boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();

                            refreshContentAndCheck(changeKey, !isInitializing);

                        }
                        

                    }
                    

                    //handler content configs

                    for (CacheData cacheData : listenCaches) {

                        String groupKey = GroupKey

                                .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());

                        if (!changeKeys.contains(groupKey)) {

                            //sync:cache data md5 = server md5 && cache data md5 = all listeners md5.

                            synchronized (cacheData) {

                                if (!cacheData.getListeners().isEmpty()) {
                                    

                                    Long previousTimesStamp = timestampMap.get(groupKey);

                                    if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,

                                            System.currentTimeMillis())) {

                                        continue;

                                    }

                                    cacheData.setSyncWithServer(true);

                                }

                            }

                        }
                        

                        cacheData.setInitializing(false);

                    }
                    

                }

            } catch (Exception e) {
                

                LOGGER.error("Async listen config change error ", e);

                try {

                    Thread.sleep(50L);

                } catch (InterruptedException interruptedException) {

                    //ignore

                }

            }

        }

    }

    //循环2结束:
    
    //循环3开始:

    if (!removeListenCachesMap.isEmpty()) {

        for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {

            String taskId = entry.getKey();

            List<CacheData> removeListenCaches = entry.getValue();

            ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);

            configChangeListenRequest.setListen(false);

            try {

                RpcClient rpcClient = ensureRpcClient(taskId);

                boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);

                if (removeSuccess) {

                    for (CacheData cacheData : removeListenCaches) {

                        synchronized (cacheData) {

                            if (cacheData.getListeners().isEmpty()) {

                                ClientWorker.this

                                        .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);

                            }

                        }

                    }

                }
                

            } catch (Exception e) {

                LOGGER.error("async remove listen config change error ", e);

            }

            try {

                Thread.sleep(50L);

            } catch (InterruptedException interruptedException) {

                //ignore

            }

        }

    }

    //循环3结束

    if (needAllSync) {

        lastAllSyncTime = now;

    }

    //If has changed keys,notify re sync md5.

    if (hasChangedKeys) {

        notifyListenConfig();

    }

}

executeConfigListen方法较长,方法开始初始化了几个CacheMap:

listenCachesMap     

removeListenCacheMap

listenCachesMap主要是监听器列表映射map

removeListenCachesMap 主要是移除监听器的列表映射

另外还有一个成员变量cacheMap 声明类型是AtomicReference<Map<String, CacheData>>

由于内容比较多,可以按代码块来分析,根据代码,主要    有三次遍历集合

1.for (CacheData cache : cacheMap.get().values()) {…}

2.for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {…}

3.for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()){…}

循环1:

1.循环cacheMap,并检查cacheMap中CacheData是否同步server,md是否一致,如果md不一致,则需要用最新的md5来接收配置变更

2.循环listenCachesMap,如果有listener且未加入listenCachesMap中,则按照taskId,listener列表按照映射放入listenCachesMap中

3.循环removeListenCachesMap,如果有listener且未加入listenCachesMap中,则按照taskId,listener列表按照映射放入removeListenCachesMap中

循环2:

1.循环并生成KeyTenant与上次修改的时间戳分别以key,value形式放入timestampMap中

2.生成请求监听上下文,获得rpcClient,并发送请求

3.如果配置变更,则生成changeKey,如果未初始化,则执行refreshContentAndCheck,refreshContentAndCheck最终会执行到checkListenerMd5方法,第一个循环中遍历cacheMap是同样会执行该方法。稍后会对该方法详细分析。

4.遍历listenCaches,生成groupKey,修改最后更新时间戳、设置是否初始化字段

循环3:

1.循环3类似循环2,但是对移除监听的处理,需要构建移除监听请求的上下文,获取rpcClient,请发送请求

2.如果响应成功,则ClientWork会执行removeCache方法,将cacheMap根据groupKey移除,更新MetricsMonitor中监听器数量

三次循环完成后,会更新最后更新事件字段,如果配置有更新,则执行notifyListenConfig根据注释,如果监听器有更新,需要重新同步md5

接下来,看一下CacheData的checkListenerMd5方法

//注:遍历listeners,如果上次调用md5与成员变量md5不同,则执行safeNotifyListener

void checkListenerMd5() {

    for (ManagerListenerWrap wrap : listeners) {

        if (!md5.equals(wrap.lastCallMd5)) {

            safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);

        }

    }

}

safeNotifyListener方法:

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,

        final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {

    final Listener listener = listenerWrap.listener;

    if (listenerWrap.inNotifying) {

        LOGGER.warn(

                "[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",

                name, dataId, group, md5, listener);

        return;

    }

    Runnable job = () -> {

        long start = System.currentTimeMillis();

        ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();

        ClassLoader appClassLoader = listener.getClass().getClassLoader();

        try {

            if (listener instanceof AbstractSharedListener) {

                AbstractSharedListener adapter = (AbstractSharedListener) listener;

                adapter.fillContext(dataId, group);

                LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);

            }

            // Before executing the callback, set the thread classloader to the classloader of

            // the specific webapp to avoid exceptions or misuses when calling the spi interface in

            // the callback method (this problem occurs only in multi-application deployment).

            Thread.currentThread().setContextClassLoader(appClassLoader);

            

            ConfigResponse cr = new ConfigResponse();

            cr.setDataId(dataId);

            cr.setGroup(group);

            cr.setContent(content);

            cr.setEncryptedDataKey(encryptedDataKey);

            configFilterChainManager.doFilter(null, cr);

            String contentTmp = cr.getContent();

            listenerWrap.inNotifying = true;

            listener.receiveConfigInfo(contentTmp);

            // compare lastContent and content

            if (listener instanceof AbstractConfigChangeListener) {

                Map data = ConfigChangeHandler.getInstance()

                        .parseChangeData(listenerWrap.lastContent, content, type);

                ConfigChangeEvent event = new ConfigChangeEvent(data);

                ((AbstractConfigChangeListener) listener).receiveConfigChange(event);

                listenerWrap.lastContent = content;

            }
            

            listenerWrap.lastCallMd5 = md5;

            LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId,

                    group, md5, listener, (System.currentTimeMillis() - start));

        } catch (NacosException ex) {

            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,

                    dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());

        } catch (Throwable t) {

            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,

                    md5, listener, t.getCause());

        } finally {

            listenerWrap.inNotifying = false;

            Thread.currentThread().setContextClassLoader(myClassLoader);

        }

    };
    

    final long startNotify = System.currentTimeMillis();

    try {

        if (null != listener.getExecutor()) {

            listener.getExecutor().execute(job);

        } else {

            try {

                INTERNAL_NOTIFIER.submit(job);

            } catch (RejectedExecutionException rejectedExecutionException) {

                LOGGER.warn(

                        "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",

                        name, dataId, group, md5, listener);

                job.run();

            } catch (Throwable throwable) {

                LOGGER.error(

                        "[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",

                        name, dataId, group, md5, listener, throwable);

                job.run();

            }

        }

    } catch (Throwable t) {

        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,

                group, md5, listener, t.getCause());

    }

    final long finishNotify = System.currentTimeMillis();

    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",

            name, (finishNotify - startNotify), dataId, group, md5, listener);

}

该方法主要创建了一个job,并执行job

job类加载部分可以先省略掉,可以看到有两个Listener

1.AbstractSharedListener

2.AbstractConfigChangeListener

AbstractSharedListener会设置两个字段,dataId和group,该抽象类有一个抽象方法 innerReceive,如果设置共享监听器,则需要继承该监听器,并实现innerReceive方法

AbstractConfigChangeListener 包含receiveConfigChange抽象方法,如果要监听配置变更,需要继承该监听器并实现receiveConfigChange方法

由此,可以看出,nacos配置变更是可以通过继承配置变更监听器来获取相关变更信息的,动态获取最新的配置,无需重启服务,更加灵活。


版权声明:本文为yizefeng7241原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。