Nacos作为注册中心-源码解析

写在前面

源码的解读比较枯燥,但是能了解其原理。反复解读能理解他本质的思想,可以下载源码跟着我的顺序来解读。
第一次写文章,望各位大神指点其不足,部分源码比较简单只在外面加了注释没有进行详细解读,有兴趣可以跟下去看。

源码下载

naocs客户端源码 https://github.com/spring-cloud-incubator/spring-cloud-alibaba.
naocs服务端源码 https://github.com/alibaba/nacos.
nacos 官方API 文档 https://nacos.io/en-us/docs/open-api.html.
本文分析以下几个部分
1.客户端注册服务
2.服务端注册服务处理
3.客户端服务发现和调用过程
4.服务端返回客户端请求服务
5.服务心跳 定时

服务注册

spring cloud 是怎么样调用到 nacos的注册服务方法?

spring cloud 调用到 nacos的注册方法时序图
1.spring boot 启动后调用到 AnnotationConfigApplicationContext 下的 AnnotationConfigApplicationContext(Class<?>… componentClasses) 构造方法,然后调用refresh方法,refresh下的 finishRefresh()方法是我们调用注册服务的一步

		/**
		 * 这里由于他有父类,所以会先调用父类的构造方法:
		 * 看源码得知初始化了DefaultListableBeanFactory
		 *
		 * 然后才调用自己的构造方法:
		 * 1.创建一个读取注解的Bean定义读取器
		 * 	将bean读取完后,会调用DefaultListableBeanFactory注册这个bean
		 * 2.创建BeanDefinition扫描器
		 *  可以用来扫描包或者类,继而转换为bd
		 */
	public AnnotationConfigApplicationContext(Class<?>... componentClasses) {
		this();
		register(componentClasses);
		refresh();
	}
public void refresh() throws BeansException, IllegalStateException {
		synchronized (this.startupShutdownMonitor) {
			/**
			 * 准备工作:
			 * 设置启动时间、是否激活标识位
			 * 初始化属性源(property source)配置
			 */
			prepareRefresh();
			/**
			 * 告诉子类刷新内部bean工厂
			 * 拿到DefaultListableBeanFactory,供后面方法调用
			 */
			ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
			/**
			 * 准备bean工厂
			 */
			prepareBeanFactory(beanFactory);

			try {
				/**
				 * 这个方法在当前版本没有实现
				 * 可能在spring后面的版本会去扩展
				 */
				postProcessBeanFactory(beanFactory);
				/**
				 * 在上下文中调用注册为bean的工厂处理器
				 *
				 * 添加BeanPostProcessor
				 * 如果发现loadTimeWeaver的Bean
				 */
				invokeBeanFactoryPostProcessors(beanFactory);
				
				/**
				 * 注册BeanPostProcessor
				 * 自定义以及spring内部的
				 */
				registerBeanPostProcessors(beanFactory);
				
				/**
				 * 国际化支持,不关心
				 */
				initMessageSource();
				
				/**
				 * 初始化事件监听多路广播器
				 */
				initApplicationEventMulticaster();
				/**
				 * 这个方法在当前版本没有实现
				 * 可能在spring后面的版本会去扩展
				 */
				onRefresh();
				
				/**
				 * 注册监听器
				 */
				registerListeners();
				
				/**
				 * 实例化所有bean
				 */
				finishBeanFactoryInitialization(beanFactory);

				/**
				 * 发布事件
				 */
				finishRefresh();
			}
			......
		}
	}

2.finishRefresh推送ServletWebServerInitializedEven事件,AbstractAutoServiceRegistration触发监听并执行服务注册

/*
* serverler执行最后的容易刷新,并推送 ServletWebServerInitializedEvent监听事件
*/
	protected void finishRefresh() {
		super.finishRefresh();
		WebServer webServer = startWebServer();
		if (webServer != null) {
			publishEvent(new ServletWebServerInitializedEvent(webServer, this));
		}
	}
/*
* 监听器监听到 WebServerInitializedEvent 执行bind方法
*/
	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}

	@Deprecated
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
		this.start();
	}
/*
* 1.推送实例注册前事件
* 2.推送注册前瑟吉欧款
* 3.执行注册后事件
* 4.重置启动状态
*/
	public void start() {
		/*注册的开关是否打开,如果只订阅是不需要打开的,这里取的是NacosDiscoveryProperties的参数*/
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}

		/*容器未启动,即第一次启动的时候*/
		if (!this.running.get()) {
			
			this.context.publishEvent(
					new InstancePreRegisteredEvent(this, getRegistration()));
					
			/*注册服务,这里是调用NacosAutoServiceRegistration*/
			register();
			/*是否需要将管理服务的服务注册到注册中心,nacos的话是null,这里可以忽略*/
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
					
			/*启动状态更新*/		
			this.running.compareAndSet(false, true);
		}

	}

3.调用到NacosAutoServiceRegistration的register() 方法

客户端是怎么将服务注册到Nacos上的

承接上面的调用链>>>>>>>
naocs客户端调用链

1.NacosAutoServiceRegistration执行register()方法,然后调用父类 AbstractAutoServiceRegistration register()方法

	protected void register() {
		/*如果未开启注册到nacos,直接返回*/
		if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
			log.debug("Registration disabled.");
			return;
		}
		/*参数配置的端口小于0),获取服务端口,所以不需要去主动配置这个参数,可以根据服务的端口字段获取*/
		if (this.registration.getPort() < 0) {
			this.registration.setPort(getPort().get());
		}
		/*调用父类注册方法*/
		super.register();
	}

	protected void register() {
		this.serviceRegistry.register(getRegistration());
	}

2.NacosNamingService执行registerInstance方法

    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
		/*临时实例*/
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());

            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
		/*注册服务*/
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

3.NamingProxy执行registerService方法,调用reqAPI进行http请求"/nacos/v1/ns/instance"

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);
		/*拼接调用参数*/
        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
		/*发送服务注册请求*/
        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {

		//获取命名空间id
        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
		//注册服务不能为空
        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
            throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }

        NacosException exception = new NacosException();
		
        if (servers != null && !servers.isEmpty()) {

            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());

            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index);
                try {
   		 			//发送服务端请求
                    return callServer(api, params, body, server, method);
			······
    public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
        throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0;
		//注入秘钥信息,如果使用阿里云的ACM
        injectSecurityInfo(params);
		//拼接请求头
        List<String> headers = builderHeaders();

		//拼接请求url
        String url;
        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
            url = curServer + api;
        } else {
            if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
                curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
            }
            url = HttpClient.getPrefix() + curServer + api;
        }

		//发送http请求
        HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
        end = System.currentTimeMillis();
		//加入监视器
        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
            .observe(end - start);

		//200
        if (HttpURLConnection.HTTP_OK == result.code) {
            return result.content;
        }

		//304
        if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
            return StringUtils.EMPTY;
        }

        throw new NacosException(result.code, result.content);
    }

服务端是接受注册请求是怎么处理的?

服务端源码目录:
naocs源码目录解释

服务注册,根据客户端请求可以发现,注册请求有2个:
心跳服务 = /nacos/v1/ns/instance/beat
发起注册请求 = /nacos/v1/ns/instance

1.服务注册/nacos/v1/ns/instance
服务注册
1.1调用服务注册接口,初始化参数,调用实例注册

    /**
     * 1.根据请求带的参数,创建实例赋值,并校验
     * 2.和心跳请求一样,调用了注册实例
     */
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        /*获取请求中的服务名和命名空间*/
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

        /*根据请求带的参数,创建实例赋值,并校验*/
        final Instance instance = parseInstance(request);

        /*和心跳请求一样,调用了注册实例*/
        serviceManager.registerInstance(namespaceId, serviceName, instance);

        return "ok";
    }

1.2如果实例不存在,创建实例并注册实例,将实例加入缓存并加入到通知定时队列

    /**
     * 1.校验并创建服务,加入缓存,并加入到健康检测的定时线程中
     * 2.再次获取服务,如果不存在直接抛出异常
     * 3.实例加入到缓存中,并返回所有实例 ,.将实例加入通知定时队列,并将任务加入定时线程中
     */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

        /*1.校验并创建服务 2.将服务加入到本地缓存中 3.服务加入到健康监测定时线程中*/
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        /*再次获取服务,如果不存在直接抛出异常*/
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }

        /*1.实例加入到缓存中,并返回所有实例 2.将实例加入通知定时队列,并将任务加入定时线程中 */
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

如果服务不存在,校验并创建服务,将服务加入到本地缓存,并加入到健康监测定时线程中

    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        createServiceIfAbsent(namespaceId, serviceName, local, null);
    }
    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            /*集群不为空,把服务加入到集群中,并更新缓存*/
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            /*服务名 集群名校验*/
            service.validate();
            /*1.服务加入缓存 2.服务初始化入定时线程,并进行监听*/
            putServiceAndInit(service);

            /*如果不是短期服务,添加或替换当前服务*/
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

添加服务到本地缓存,初始化服务,将服务加入到健康检查定时线程,初始化集群,将集群加入到健康检查定时线程

    /**
     * 添加服务到本地缓存,初始化服务,将服务加入到健康检查定时线程,初始化集群,将集群加入到健康检查定时线程
     * @param service
     * @throws NacosException
     */
    private void putServiceAndInit(Service service) throws NacosException {
        /*将服务加入到本地缓存中*/
        putService(service);
        /*初始化服务,初始化集群*/
        service.init();
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

初始化服务

    /**
     * 初始化服务
     */
    public void init() {
        /*加入心跳检测定时任务*/
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            /*集群健康检查初始化*/
            entry.getValue().init();
        }
    }

1.3.将实例加入服务中,并加入实例缓存

    /**
     * 将实例添加到缓存中
     */
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
            /*实例加入到缓存中*/
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            /*加入到阻塞队列中,并加入定时任务队列*/
            consistencyService.put(key, instances);
        }
    }

2.心跳服务 = /nacos/v1/ns/instance/beat
心跳服务接口
2.1.调用服务端创建心跳接口

    /**
     * 1.从缓存中获取对应ip 端口的实例
    *  2.如果实例不存在,注册实例
     * 3.初始化实例的健康状态为true ,最后执行的心跳时间为当前时间,触发服务更新的监听
     * 4.将 心跳周期 ,执行状态返回
     */
    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {

        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        /*心跳周期*/
        result.put("clientBeatInterval", switchDomain.getClientBeatInterval());

        /*心跳值获取,没有则取默认值空,不为空转化为对象,通过客户端源码可以看到
我们有传beatInfo的json字符串*/
        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
        RsInfo clientBeat = null;
        if (StringUtils.isNotBlank(beat)) {
            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
        }

        /*获取集群值,没有则取默认值*/
        String clusterName = WebUtils
                .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);

        /*获取ip,端口值,没有则取默认值.如果beat不为空则从beat中取*/
        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
        if (clientBeat != null) {
            if (StringUtils.isNotBlank(clientBeat.getCluster())) {
                clusterName = clientBeat.getCluster();
            } else {
                clientBeat.setCluster(clusterName);
            }
            ip = clientBeat.getIp();
            port = clientBeat.getPort();
        }

        /*serviceName namespaceId 获取*/
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);

        /*3.1.1从缓存中获取对应ip 端口的实例*/
        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

        /*如果实例不存在*/
        if (instance == null) {
            if (clientBeat == null) {
                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
                return result;
            }
            
            Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                    + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
            
            instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());

            /*3.1.2 注册实例*/
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }

        /*再次获取服务*/
        Service service = serviceManager.getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.SERVER_ERROR,
                    "service not found: " + serviceName + "@" + namespaceId);
        }

        /*心跳对象为空,则重新生成*/
        if (clientBeat == null) {
            clientBeat = new RsInfo();
            clientBeat.setIp(ip);
            clientBeat.setPort(port);
            clientBeat.setCluster(clusterName);
        }
        /*初始化实例的健康状态为true ,最后执行的心跳时间为当前时间,触发服务更新的监听*/
        service.processClientBeat(clientBeat);
        
        result.put(CommonParams.CODE, NamingResponseCode.OK);
        result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
		/*轻量级心跳状态*/
        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
        return result;
    }

2.2注册实例看上面注册实例的接口解释

2.3ClientBeatCheckTask 下执行定时任务,
1.循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳间隔时间,将健康状
态设置为false
2.循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳删除时间,将实例删除

/**
* 1. 循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳间隔时间,将健康状
* 态设置为false
* 2.  循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳删除时间,将实例删除
/
    public void run() {
        try {
            /*task中没有该服务名*/
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            /*不进行健康检测*/
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }

            /*所有临时状态的实例*/
            List<Instance> instances = service.allIPs(true);

            for (Instance instance : instances) {
                /*离上次执行心跳间隔超出了心跳间隔时间*/
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        /*健康状态设置为不健康*/
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            /*超时实例如果为false(默认为true),直接返回*/
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }

                /*离上次执行心跳间隔超过了,删除的最大时间*/
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    /*将实例删除*/
                    deleteIp(instance);
                }
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
        
    }

服务发现

什么时候去获取到Nacos上的服务?

首先在启动,在启动日志中可以看到,BaseLoadBalancer接口会进行服务发现,也就是在使用ribbon的时候调用到,然后我们跟踪下调用链
启动日志

打上断点后,查看调用链,发现是在创建feign bean的时候,会进行服务发现,最终调用到 com.alibaba.cloud.nacos.ribbon.NacosServerList
断点查看

客户端是怎么获取到对应的服务地址的?

客户端服务发现

1.NacosServerList 根据服务名和group获取所有服务

	private List<NacosServer> getServers() {
		try {
			/*discoveryProperties  为 ${spring.cloud.nacos.discovery}  下的属性*/
			String group = discoveryProperties.getGroup();
			/*根据nacos属性创建namingService ,调用实例发现*/
			List<Instance> instances = discoveryProperties.namingServiceInstance()
					.selectInstances(serviceId, group, true);
			/*将返回的instance 对象转化为 NacosServer 对象*/
			return instancesToServerList(instances);
		}
	}

2.NamingService 创建

	public NamingService namingServiceInstance() {

		if (null != namingService) {
			return namingService;
		}

		try {
			/*获取nacos属性,并创建NamingService*/
			namingService = NacosFactory.createNamingService(getNacosProperties());
		}
		catch (Exception e) {
			log.error("create naming service error!properties={},e=,", this, e);
			return null;
		}
		return namingService;
	}

3.根据 serviceId group 查询所有的实例

    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        /*如果是订阅者*/
        if (subscribe) {
        	/*根据group name 和 集群名获取服务*/
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
        	/*无论实例是否健康,直接调用三方去获取*/
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        /*根据healthy状态过滤所有实例*/
        return selectInstances(serviceInfo, healthy);
    }

4.从服务端拉取实例并缓存到本地

    /**
     * 1.从缓存中获取服务
     * 2.如果 本地缓存不存在,新建服务,从nacos服务端拉取服务更新到本地缓存
      *3.加入定时线程中,定时去服务端同步服务的实例情况
     */
    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());

        String key = ServiceInfo.getKey(serviceName, clusters);
		/*故障转移是否打开*/
        if (failoverReactor.isFailoverSwitch()) {
		/*先从缓存中获取,没有则新建一个服务对象返回*/
            return failoverReactor.getService(key);
        }
		/*从缓存中获取获取serviceInfo*/
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
		/*本地缓存中不存在该服务*/
        if (null == serviceObj) {
     	/*新建serviceInfo 对象,并加入本地缓存*/
            serviceObj = new ServiceInfo(serviceName, clusters);
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
   		/*加入到正在更新map中作为记录*/
            updatingMap.put(serviceName, new Object());
    	/*更新服务,从nacos服务端拉取服务更新到本地缓存*/
            updateServiceNow(serviceName, clusters);
  		/*从更新中map移除*/
            updatingMap.remove(serviceName);

        }
		/*服务正处于更新中,对该线程上锁,等待服务更新完成通知,或者5秒后自动释放*/
		else if (updatingMap.containsKey(serviceName)) {
            if (UPDATE_HOLD_INTERVAL > 0) {
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
		/*加入定时线程中,定时去服务端同步服务的实例情况*/
        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }

4.1 从Nocos 服务端查询实例更新到本地缓存

    public void updateServiceNow(String serviceName, String clusters) {
		/*尝试去拿到缓存中服务*/
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
  	 		 /*从nacos上查询服务*/
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
  
            if (StringUtils.isNotEmpty(result)) {
			/*根据查询到的结果,更新本地缓存中服务信息*/
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
    		   	/*将oldService对象wait的地方全部通知释放(在updatingMap有该服务key的时候通知)*/
                    oldService.notifyAll();
			......

拼接参数调用服务端

 /**
* serviceName 服务名
* clusters 集群列表
*  本地udp端口
*  healthyOnly 健康状态
**/
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {

        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
		/*调用服务器查询(/nacos/v1/ns/instance/list)*/
        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }

4.2加入定时线程中,定时去服务端同步服务的实例情况

	/**
	* futureMap 存放 ScheduledFuture 类型的对象,即存放的是定时线程
	**/
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
		/*双重校验,如果已经存在定时则直接返回*/
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
   	 		/*加入定时更新服务的任务*/
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

定时任务执行

    public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private String clusters;
        private String serviceName;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        @Override
        public void run() {
            try {
				/*缓存中获取对应的服务*/
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
				/*缓存不存在服务*/
                if (serviceObj == null) {
    			/*请求nacos服务端并更新本地缓存*/
                    updateServiceNow(serviceName, clusters);
   				 /*加入定时线程,默认间隔1秒钟*/
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }

				/*该服务执行的时间 小于等于 上一次记录的执行时间的时候*/ 
                if (serviceObj.getLastRefTime() <= lastRefTime) {
     				/*请求nacos服务端并更新本地缓存*/
                    updateServiceNow(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
    				/*只查询不更新*/
                    refreshOnly(serviceName, clusters);
                }
 				/*将记录执行时间更新为下次服务执行的时间*/
                lastRefTime = serviceObj.getLastRefTime();

				/*如果订阅线程已经终止*/
                if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
                    !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task:
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
				/*将当前对象继续加入定时线程,下次执行时间为服务设置的时间,一般5s钟*/
                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
                ......

服务端是怎么返回请求对应的服务信息?

官方文档
请求地址:/nacos/v1/ns/instance/list

服务端服务发现

客户端调用服务端的请求,从请求中获取参数,然后根据条件获取实例和其他服务信息,封装返回

    /**
     * 获取所有的实例
     */
    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {

        //命名空间
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //服务名
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //请求agent
        String agent = WebUtils.getUserAgent(request);
        //客户端集群
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        //客户端ip
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        //dup端口
        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        //环境参数
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
        //app参数
        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
        
        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
        //健康状态
        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
        /*根据条件获取服务信息*/
        return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                healthyOnly);
    }

从缓存中获取服务,根据负载均衡算法获取所有的实例。根据健康状态和实例的可用状态封装服务信息返回

    /**
     * 1.获取服务,如果获取不到服务则返回空 hosts
     * 2.根据负载均衡算法获取,提供者实例
     * 3.过滤出可用的实例,封装返回所有hosts
     */
    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
        
        ClientInfo clientInfo = new ClientInfo(agent);
        ObjectNode result = JacksonUtils.createEmptyJsonNode();

        /*从缓存中获取服务*/
        Service service = serviceManager.getService(namespaceId, serviceName);

        /*如果服务为空,返回hosts为空*/
        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }

        /*校验服务是否可用*/
        checkIfDisabled(service);

        /*缓存时间,默认3秒*/
        long cacheMillis = switchDomain.getDefaultCacheMillis();
        
        /*添加对应服务的pushClient*/
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {
                
                pushService
                        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                                pushDataSource, tid, app);
                /*缓存时间取 defaultPushCacheMillis 默认为10s*/
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }
        
        List<Instance> srvedIPs;

        /*获取集群下的所有实例*/
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

        /*负载均衡算法不为空的话,根据客户端ip获取服务提供者*/
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }

        /*如果没有实例,返回空hosts 的结果*/
        if (CollectionUtils.isEmpty(srvedIPs)) {
            
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            
            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }
            
            result.put("hosts", JacksonUtils.createEmptyArrayNode());
            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            result.put("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
            return result;
        }

        /*区分健康状态true和false的实例*/
        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());

        for (Instance ip : srvedIPs) {
            ipMap.get(ip.isHealthy()).add(ip);
        }
        
        if (isCheck) {
            result.put("reachProtectThreshold", false);
        }
        
        double threshold = service.getProtectThreshold();
        
        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
            
            Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
            if (isCheck) {
                result.put("reachProtectThreshold", true);
            }
            
            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            ipMap.get(Boolean.FALSE).clear();
        }
        
        if (isCheck) {
            result.put("protectThreshold", service.getProtectThreshold());
            result.put("reachLocalSiteCallThreshold", false);
            
            return JacksonUtils.createEmptyJsonNode();
        }
        
        ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

        /**
         * 将实例封装后添加到hosts列表中返回
         */
        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();

            /*过滤不健康的实例*/
            if (healthyOnly && !entry.getKey()) {
                continue;
            }
            
            for (Instance instance : ips) {

                /*过滤不可用的实例*/
                if (!instance.isEnabled()) {
                    continue;
                }
                
                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
                
                ipObj.put("ip", instance.getIp());
                ipObj.put("port", instance.getPort());
                // deprecated since nacos 1.0.0:
                ipObj.put("valid", entry.getKey());
                ipObj.put("healthy", entry.getKey());
                ipObj.put("marked", instance.isMarked());
                ipObj.put("instanceId", instance.getInstanceId());
                ipObj.put("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
                ipObj.put("enabled", instance.isEnabled());
                ipObj.put("weight", instance.getWeight());
                ipObj.put("clusterName", instance.getClusterName());
                if (clientInfo.type == ClientInfo.ClientType.JAVA
                        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    ipObj.put("serviceName", instance.getServiceName());
                } else {
                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                }
                
                ipObj.put("ephemeral", instance.isEphemeral());
                hosts.add(ipObj);
                
            }
        }
        
        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }
    

feign是怎样调用到服务端的?

feign类,会在初始化为bean的时候生成代理类,最终在调用的时候都会调用到SynchronousMethodHandler的invoke()方法

1.SynchronousMethodHandler 执行invoke方法,调用client.execute(request, options)

  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Options options = findOptions(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
      	/*执行*/
        return executeAndDecode(template, options)
        ......
  Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
    Request request = targetRequest(template);

    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }

    Response response;
    long start = System.nanoTime();
    try {
    	/*客户端调用执行*/
      response = client.execute(request, options);
      ......

2.调用到ribbon包下LoadBalancerFeignClient 下execute实现类,根据feign的clientName 执行对应的请求

	public Response execute(Request request, Request.Options options) throws IOException {
		try {
			URI asUri = URI.create(request.url());
			String clientName = asUri.getHost();
			URI uriWithoutHost = cleanUrl(request.url(), clientName);
			FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
					this.delegate, request, uriWithoutHost);

			IClientConfig requestConfig = getClientConfig(options, clientName);
			/*工厂方法创建对应的客户端,然后执行负载均衡调用方法*/
			return lbClient(clientName)
					.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
		}
		catch (ClientException e) {
			IOException io = findIOException(e);
			if (io != null) {
				throw io;
			}
			throw new RuntimeException(e);
		}
	}

3.调用到AbstractLoadBalancerAwareClient下的executeWithLoadBalancer,根据负载均衡算法,选择一个host进行调用,并将结果返回

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
       /*根据请求的配置,然后从所有服务中根据负载均衡算法,组装成对应的command命令*/
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
        	/*调用服务端,返回对应的结果*/
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
                ......

总结

1.客户端组装自己的服务信息,然后向服务端发起注册请求
2.服务端接收到注册请求后,将服务信息加入到本地缓存,并且加入定时,不断的检测服务是否健康,更新或删除注册的服务
3.客户端会在启动时,spring创建feign bean的时候会去从Nacos服务端获取service信息,并且加入定时,不断的拉取feign对应模块的服务信息
4.客户端在通过feign调用的时候,会通过负载均衡算法,从本地缓存中选择一个服务进行调用
5.服务端接收到发现请求时,会根据条件从服务端本地缓存中获取对应的实例,封装好返回给客户端


版权声明:本文为qq_16552433原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。