nacos 服务注册原理

1. nacos服务注册与发现源码深度剖析

2. nacos注册表如何防止多节点读写并发冲突

3. nacos高并发支撑异步任务与内存队列剖析


1. nacos服务注册

nacos客户端 : 

springboot自动装配类

 进入这个类查看

 进入new方法

nacos客户端最终调用服务端方法, 进行服务注册, 接下来看服务端逻辑 

nacos服务端: 

 

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        //获取namespaceId
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //serviceName
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        //构建instance
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        
        //对instance进行注册
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

进入registerInstance方法

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        
        //判断service是否为空, 如果不是空就返回, 如果为空则创建一个service对象,(第一次进来为空)
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        
        //获取刚才创建的service
        Service service = getService(namespaceId, serviceName);
        
        //对service再一次进行校验, 判断是否为空, 为空则抛异常
        checkServiceIsNull(service, namespaceId, serviceName);
        
        //对instance进行注册
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

进入createEmptyService方法

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            //设置serviceName名
            service.setName(serviceName);
            //设置命名空间
            service.setNamespaceId(namespaceId);
            //设置组名
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

进入getService方法

    public Service getService(String namespaceId, String serviceName) {

    /**
     * Map(namespace, Map(group::serviceName, Service)).
     * nacos最关键的一个map
     */
    //private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

        if (serviceMap.get(namespaceId) == null) {
            return null;
        }
        return chooseServiceMap(namespaceId).get(serviceName);
    }

退回来, 进入addInstance方法

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {

        //根据ephemeral变量生成一个字符串, ephemeral默认为true, 字符串中包含ephemeral
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        //获取刚才生成的service
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {

            //将ips进行封装, 生成Instance集合
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            
            //将实例放入到service中
            consistencyService.put(key, instances);
        }
    }

进入consistencyService.put方法 (DelegateConsistencyServiceImpl类)

    @Override
    public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);
    }

进入mapConsistencyService(key)方法

    private ConsistencyService mapConsistencyService(String key) {
        //根据刚才生成的key, 获取对象, 默认会获取第一个
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

进入mapConsistencyService(key).put(key, value);方法, 最终进入到DistroConsistencyServiceImpl类的put方法

    @Override
    public void put(String key, Record value) throws NacosException {
        //主要方法, 将instance放入到service中
        onPut(key, value);
        // If upgrade to 2.0.X, do not sync for v1.
        if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
            return;
        }
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                DistroConfig.getInstance().getSyncDelayMillis());
    }

进入onPut()方法

    public void onPut(String key, Record value) {
        
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }
        
        if (!listeners.containsKey(key)) {
            return;
        }
        
        notifier.addTask(key, DataOperation.CHANGE);
    }

进入到addTask()方法

        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }
        

走到offer方法,  instance注册完成

大家可能会好奇, 在哪里将实例注册的?

其实是通过阻塞队列 + 线程池的方式注册的

DistroConsistencyServiceImpl类中的init方法启动线程
    @PostConstruct
    public void init() {
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }

看代码

public class Notifier implements Runnable {
        
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
        
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        
        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            //入阻塞队列
            tasks.offer(Pair.with(datumKey, action));
        }
        
        public int getTaskSize() {
            return tasks.size();
        }
        
        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            //无限循环
            for (; ; ) {
                try {
                    //出阻塞队列
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
        
        //主要方法
        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                
                services.remove(datumKey);
                
                int count = 0;
                
                if (!listeners.containsKey(datumKey)) {
                    return;
                }
                
                for (RecordListener listener : listeners.get(datumKey)) {
                    
                    count++;
                    
                    try {
                        if (action == DataOperation.CHANGE) {
                            //走这里
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }

进入listener.onChange方法(Service类)

    @Override
    public void onChange(String key, Instances value) throws Exception {
        
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        
        for (Instance instance : value.getInstanceList()) {
            
            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            
            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }
            
            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        //走这里
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        
        recalculateChecksum();
    }

进入updateIPs方法

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }
        
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }
                
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }
                
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }
                
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }
                
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }
        
        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            //主要走下面这个方法
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }
        
        setLastModifiedMillis(System.currentTimeMillis());
        getPushService().serviceChanged(this);
        ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
        StringBuilder stringBuilder = new StringBuilder();
        
        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
        }
        
        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                stringBuilder.toString());
        
    }

进入clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);方法

    public void updateIps(List<Instance> ips, boolean ephemeral) {
        
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
        
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
        
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
        
        List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());
        if (updatedIps.size() > 0) {
            for (Instance ip : updatedIps) {
                Instance oldIP = oldIpMap.get(ip.getDatumKey());
                
                // do not update the ip validation status of updated ips
                // because the checker has the most precise result
                // Only when ip is not marked, don't we update the health status of IP:
                if (!ip.isMarked()) {
                    ip.setHealthy(oldIP.isHealthy());
                }
                
                if (ip.isHealthy() != oldIP.isHealthy()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                            (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
                }
                
                if (ip.getWeight() != oldIP.getWeight()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP, ip);
                }
            }
        }
        
        List<Instance> newIPs = subtract(ips, oldIpMap.values());
        if (newIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                            getName(), newIPs.size(), newIPs);
            
            for (Instance ip : newIPs) {
                HealthCheckStatus.reset(ip);
            }
        }
        
        List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
        
        if (deadIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                            getName(), deadIPs.size(), deadIPs);
            
            for (Instance ip : deadIPs) {
                HealthCheckStatus.remv(ip);
            }
        }
        
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }
ephemeralInstances变量是Cluster类中的一个set属性, 里面装的就是全部实例,
ephemeralInstances = toUpdateInstances; 最终将toUpdateInstances赋值给ephemeralInstances,将全部的实例放入到Cluster中的一个set属性中, 实例注册完成

namespace -> group -> service -> cluster -> set<Intance> 最终回去全部的属性


版权声明:本文为hc1428090104原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。