Nacos:配置中心原理分析

        有一个问题我们需要弄明白,那就是 Nacos 客户端是怎么实时获取到 Nacos 服务端的最新数据的?

        其实客户端和服务端之间的数据交互,无外乎两种情况:

  • 服务端推数据给客户端;
  • 客户端从服务端啦数据;

        那到底是推还是拉呢?从 Nacos 客户端通过 Listener 来接收最新数据的这个做法来看,感觉像是服务端推的数据,但是不能想当然,我们从源码中来确认一下。

        从原生SDK代码入手,可以发现最核心的两行代码:

ConfigService configService=NacosFactory.createConfigService(properties);
String content=configService.getConfig(dataId,groupId,3000);
        1) 创建 ConfigService
com.alibaba.nacos.api.NacosFactory代码片段
public static ConfigService createConfigService(Properties properties) throws NacosException {
    return ConfigFactory.createConfigService(properties);
}
com.alibaba.nacos.api.config.ConfigFactory代码片段
public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
    	//调用反射创建一个NacosConfigService实例
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

        可以看到实际是通过反射调用了 NacosConfigService 的构造方法来创建 ConfigService 的,而且有一个 Properties 参数的构造方法。

        需要注意,这李并没有通过单例或者缓存技术,也就是说每次调用都会重新创建一个 ConfigService 实例。

        2) 实例化 ConfigService
com.alibaba.nacos.client.config.NacosConfigService代码片段
/**
 * http agent
 */
private HttpAgent agent;
/**
 * longpolling 长轮询
 */
private ClientWorker worker;
private String namespace;
private String encode;
private ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager();

public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    //初始化命名空间
    initNamespace(properties);
    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    agent.start();
    worker = new ClientWorker(agent, configFilterChainManager, properties);
}

        在实例化时主要初始化了两个对象 ServerHttpAgent 和 ClientWorker。通过包装器模式,将 ServerHttpAgent 包装成了MetricsHttpAgent,MetricsHttpAgent 在内部也是调用了 ServerHttpAgent 的方法,另外加上了一些统计操作,而 agent 实际实在 ClientWorker 中发挥能力的。

com.alibaba.nacos.client.config.impl.ClientWorker代码片段
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;
    //初始化一些参数
    init(properties);
	//创建一个定时任务的线程池
    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
	//创建一个保持长连接的线程池
    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
	//创建一个延迟任务,每个10ms检查配置信息
    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

        从上述 ClientWoker 初始化看出,除了将 HttpAgent 维持在自己内部,还创建了两个线程池:

  • 第一个线程池负责与配置中心进行数据的交互,并且启动后延迟1ms,之后每隔10ms对配置信息进行定时检查;
  • 第二个线程池负责保持一个长轮询;

        服务启动之后,第一个线程池便会执行checkConfigInfo(),跟进看看它做了什么。

com.alibaba.nacos.client.config.impl.ClientWorker代码片段
public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

        从检查配置信息可以看到,checkConfigInfo 方法是取出了一批任务,然后提交给 executorService 线程池去执行,执行的任务就是 LongPollingRunnable,每个任务都有一个 taskId。

com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable代码片段
public void run() {
    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        //容错配置检查
        for (CacheData cacheData : cacheMap.get().values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                	//检查本地配置
                    checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }
        //检查服务端配置
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
            	//获取服务端的最新配置
                String content = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                //将配置设置进缓存,并更新md5,这时CacheData的md5与Listener的md5不一致
                cache.setContent(content);
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(content));
            } catch (NacosException ioe) {
                String message = String.format(
                    "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
                LOGGER.error(message, ioe);
            }
        }
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList
                .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                //检查监听器的md5,若与CachaData缓存中的md5不一致,说明服务端的配置已经发生了变更,则通知监听器进行处理					
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();
        executorService.execute(this);
    } catch (Throwable e) {
        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

        从 LongPollingRunnable 可以看出,它主要分为两部分:

  • 检查本地的配置信息
private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);

    // 没有 -> 有
    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        String md5 = MD5.getInstance().getMD5String(content);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);

        LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
            agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }

    // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
        cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
            dataId, group, tenant);
        return;
    }

    // 有变更
    if (cacheData.isUseLocalConfigInfo() && path.exists()
        && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        String md5 = MD5.getInstance().getMD5String(content);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
            agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
    }
}

        首先取出与该 taskId 相关的 CacheData,然后对 CacheData 进行检查,包括本地配置检查和监听器的 md5 检查,本地检查主要是做一个故障容错,当服务端挂掉后,Nacos 客户端可以从本地的文件系统中获取相关的配置信息,配置信息保存在:

~/nacos/config/fixed-{address}_8848_nacos/snapshot/DEFAULT_GROUP/{dataId}
  • 获取服务端的配置信息,更新到本地
com.alibaba.nacos.client.config.impl.ClientWorker代码片段
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) {
        if (!cacheData.isUseLocalConfigInfo()) {
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {
                // cacheData 首次出现在cacheMap中&首次check更新
                inInitializingCacheList
                    .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
        throws NacosException {
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }

    HttpResult result = null;
    try {
        List<String> params = null;
        if (StringUtils.isBlank(tenant)) {
            params = Arrays.asList("dataId", dataId, "group", group);
        } else {
            params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
        }
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (IOException e) {
        String message = String.format(
            "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
            dataId, group, tenant);
        LOGGER.error(message, e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
    switch (result.code) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
            return result.content;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return null;
        case HttpURLConnection.HTTP_CONFLICT: {
            LOGGER.error(
                "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                    + "tenant={}", agent.getName(), dataId, group, tenant);
            throw new NacosException(NacosException.CONFLICT,
                "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
                group, tenant);
            throw new NacosException(result.code, result.content);
        }
        default: {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
                group, tenant, result.code);
            throw new NacosException(result.code,
                "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
    }
}

        先通过 checkUpdateDataIds() 方法从服务端获取那些值发生了变化的 dataId 列表,然后通过 getServerConfig 方法,根据 dataId 从服务端拉取最新的配置信息,并保存到 CacheData 中,最后再调用 CacheData 的 checkListenerMd5 方法 检查md5值是否不一致,进行通知。

        

com.alibaba.nacos.client.config.impl.CacheData代码片段
void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, md5, wrap);
        }
    }
}

private void safeNotifyListener(final String dataId, final String group, final String content,
                                    final String md5, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;
    Runnable job = new Runnable() {
        @Override
        public void run() {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                Thread.currentThread().setContextClassLoader(appClassLoader);
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listener.receiveConfigInfo(contentTmp);
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                    listener);
            } catch (NacosException de) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
                    dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
                    md5, listener, t.getCause());
            } finally {
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        }
    };
    final long startNotify = System.currentTimeMillis();
    try {
        if (null != listener.getExecutor()) {
            listener.getExecutor().execute(job);
        } else {
            job.run();
        }
    } catch (Throwable t) {
        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
            md5, listener, t.getCause());
    }
    final long finishNotify = System.currentTimeMillis();
    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
        name, (finishNotify - startNotify), dataId, group, md5, listener);
}

        


引用

https://www.jianshu.com/p/38b5452c9fec
https://www.cnblogs.com/wuzhenzhao/p/11385079.html


版权声明:本文为rambogototravel原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。