nacos注册中心源码解析(一)

 

 

说道源码解析,那就不得不把源码从git上下载下来,所以想要深入的童鞋一定要把源码给下载下来。

那么接下来就直接开始吧!

那么从哪开始呢?我们日常使用当然是启动nacos,然后在一个springboot配置中添加nacos服务发现的一个jar包,我们项目用的是2.2.3版本的

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
       

那么之后呢? 如果对springboot自动配置原理有些了解的小伙伴会知道,springboot在启动的时候会扫描jar包中META-INF/spring.factories这个文件,如下是nacos-discovery jar包中的spring.factories文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
  com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

从上述我们看到这个jar包配置了挺多的自动配置类,有什么ribbon,nacos的也很多,怎么看呢?这里凭借经验或者理解来说是这个NacosServiceRegistryAutoConfiguration类,nacos服务注册自动配置,那么我们点进去

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

}

可以看到这里注册了三个bean,这三个bean有什么关系呢,我们逐一点击去看下继承关系中可以得到如下结果

这里稍微解释,继承了applicationListener接口需要实现onApplicationEvent这个方法,他会监听对应的事件,这里我把WebServerInitializedEvent的注释带上了,意思就是在事件在web服务准备好了就会被发布。

顺着我图中的顺序,我们来到NacosServiceRegistry.register(Registration registration)方法

        NamingService namingService = namingService();
        String serviceId = registration.getServiceId();
		String group = nacosDiscoveryProperties.getGroup();

		Instance instance = getNacosInstanceFromRegistration(registration);

		try {
            //从方法名称来看是注册实例,我们点进去
			namingService.registerInstance(serviceId, group, instance);
		}
		catch (Exception e) {
			rethrowRuntimeException(e);
		}

接下来我们来到

    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //判断是不是临时实例
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

这里的临时实例稍作解释,在nacos中我们会区分临时实例和持久化实例,而他们分别对应着ap架构和cp架构,我们公司用的是ap架构。通过beat这个词,我们大致可以推断出,这是个心跳机制的东东,我们先略过这个if走主线,往下就是又是一个注册服务,点进去。

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
     
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        ...
        
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    }

这个方法看起来就是拼接了参数,然后通过reqApi去调用,我们看下UtilAndComs.nacosUrlInstance = "/nacos" +  "/v1/ns" + "/instance",从官网https://nacos.io/zh-cn/docs/open-api.html我们可以看到

这就是nacos官网上开放的openapi,我们继续往下走就是通过http的方式拼接了地址和端口号,调用nacos server端了,那么接下来就是核心啦,我们需要借用nacos源码,然后去看服务端的逻辑了。

来到服务端,通常我们用全局搜索这个路径,发现没找到,怎么办,最简单的就是全局搜Controller,不多说,我们直接来到

注意:我这里的nacos版本是基于生产可使用的,也就是1.1.4,并非2.x版本(有bug不稳定)

来到InstanceController

@PostMapping
    public String register(HttpServletRequest request) throws Exception {

        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);


        serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
        return "ok";
    }

点进去

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

        //创建一个空实例
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        Service service = getService(namespaceId, serviceName);

        //添加实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

 

这个是createEmptyService的一个主要流程,我们主要是看下最后的一个serviceMap.put()

 public void putService(Service service) {
        if (!serviceMap.containsKey(service.getNamespaceId())) {
            synchronized (putServiceLock) {
                if (!serviceMap.containsKey(service.getNamespaceId())) {
                    serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
                }
            }
        }
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }

这里采用双重判断来保证服务注册的安全性,当这个服务没有任何一个实例的时候,就新建,不然就get出来在put,所以这个map的结构也就是

Map<namespace, Map<group::serviceName, Service>>

而这个Map也我们的服务注册的列表了。那么我们回到putServiceAndInit方法

private void putServiceAndInit(Service service) throws NacosException {
        //创建内存注册表结构
        putService(service);
        //服务初始化
        service.init();
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
    }

在内存中创建结构后,还进行了服务初始化,我们点进去看下做了什么

public void init() {
        //健康任务的检查
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        //集群同步的更新
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

点进去会发现,有个只是把这个任务放到了一个map当中,其实这是我觉得nacos快的一个比较重要的原因之一,异步调用,这里提一句,笔者之前公司使用的是eureka,在k8s环境发布后,由于eureka的三层map缓存和ribbon等一些的延迟,会导致服务出现在注册中心列表中需要30s 到2min不等,而nacos基本可以保证在10s左右,我们阅读源码其实并不一定需要把很多细节搞清楚,最重要的笔者认为是思想,好了不扯了,继续。

clientBeatCheckTask是一个实现了Runnable接口的,那么我们可以直接去看它的run方法。

 try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }

            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            //翻译过来就是有限设置实例的健康状态
            for (Instance instance : instances) {
                //判断当前时间到上一次心跳时间是否超过配置值,默认15s
                //public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
                                UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            //添加到map中,里面也是异步执行的逻辑,springcontext发布事件也是一种解耦的操作
                            getPushService().serviceChanged(service);
                            //发布时间
                            SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            // then remove obsolete instances:
            //然后移除一些过期的实例
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }
                //public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
                    //删除实例,又是用的异步
                    deleteIP(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

这个代码有些长,我也就都加上了注释,就是当前时间和上一次的心跳时间,其实就是最开始说的那个if判断是否是临时实例,那里面会走心跳链接的逻辑,而时间如果没有配置,会有个默认的15s,如果过了就会把实例的健康状态置为false,删除实例也是一样的,其实就是http调用,同样也是采用异步的方式,你就会发现nacos的太多操作是基于异步的了,而异步最大的好处就是速度快。

 

那么既然说到了心跳,那么我们就不得不退回到最开始的if条件了,其实读者会发现,在源码中很多情况你可以先不要看特殊逻辑,先去走主流程,对于特殊逻辑有个印象就好了,当后续用到的时候再折回头来。继续!

我们切回客户端

   public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //判断是不是临时实例
        if (instance.isEphemeral()) {
            //我们回到这里
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

buildBeatInfo方法仅仅只收构建了一下参数,然后下面一行代码我们看一下

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
       ...
        //又是个定时器 默认就是5s  public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

这里就可以算出 15s / 5s = 3次,即三次重试失败实例的健康状态才会被置为false,联系上下文!

if (beatInfo.isStopped()) {
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
                //发送心跳
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                //如果实例不存在,就回去注册
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
                
            }
            //循环的定时任务
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);

那其实发送心跳也就是个http链接了

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
        ...
        String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
        return JacksonUtils.toObj(result);
    }

那接下来不用我说了吧,我们直接来到server端

@PutMapping("/beat")
    public JSONObject beat(HttpServletRequest request) throws Exception {

        ...
        //是持久化实例么
        if (!switchDomain.isDefaultInstanceEphemeral() && !clientBeat.isEphemeral()) {
            return result;
        }
        
        //设置集群
        if (StringUtils.isBlank(clientBeat.getCluster())) {
            clientBeat.setCluster(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        }

        String clusterName = clientBeat.getCluster();

        //获取实例
        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(),
            clientBeat.getIp(),
            clientBeat.getPort());
        //如果实例不存在,则重新注册
        if (instance == null) {
            instance = new Instance();
            ...
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }

        Service service = serviceManager.getService(namespaceId, serviceName);

        //心跳
        service.processClientBeat(clientBeat);
        result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
        return result;
    }

我们直接来到心跳的方法,还是异步

public static ScheduledFuture<?> scheduleNow(Runnable task) {
        //delay = 0 立即开启
        return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
    }

那么直接来到clientBeat的run方法了

 for (Instance instance : instances) {
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                //设置上次的心跳时间   有没有之前的注册的那个联系到一块了
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    //如果实例健康状态为false,置为true,我们之前不是看到15s会置位false,30s才会删除实例
                    if (!instance.isHealthy()) {
        
                        instance.setHealthy(true);
                        Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                            cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }

那么心跳连接也就到此结束了。那么就此我们都串联起来了,你也为这完了? 当初我们点的方法时createEmptyService,既然是empty,那肯定有下文呀,我们就开个小头

 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

        
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //还有这
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

继续看看后续做了什么

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        Service service = getService(namespaceId, serviceName);
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            consistencyService.put(key, instances);
        }
    }

就只是一个put就结束了?这又是一个map,其实不是

在DelegateConsistencyServiceImpl类中

private ConsistencyService mapConsistencyService(String key) {
        //通过key来区分临时实例和持久化实例
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

那么本文就到这了,下篇文章,就是我们一起来分析分析持久化实例和临时实例,nacos是怎么样做的,也是是nacos的ap和cp架构咯。

半年没写博文了,主要是写博文真的累,还怕自己理解有问题误导了读者,但是想想一肚子知识还是得分享出来才行,还是那句话,我说的不一定对,多思考多实践才是真理,好了下次见。


版权声明:本文为weixin_42258418原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。