SpringCloudAlibaba-Nacos服务注册的原理

nacos是阿里巴巴开源的一个集注册中心和配置中心为一体的组件,先已加入到apache大家庭中。本文主要介绍nacos作为注册中心中的服务注册原理

首先,所有的服务注册功能,都使用一个相同的接口---ServiceRegistry

package org.springframework.cloud.client.serviceregistry;

/**
 * Contract to register and deregister instances with a Service Registry.
 *
 * @author Spencer Gibb
 * @since 1.2.0
 */
public interface ServiceRegistry<R extends Registration> {

	/**
	 * Register the registration. Registrations typically have information about
	 * instances such as: hostname and port.
	 * @param registration the registraion
	 */
	void register(R registration);

	/**
	 * Deregister the registration.
	 * @param registration
	 */
	void deregister(R registration);

	/**
	 * Close the ServiceRegistry. This a lifecycle method.
	 */
	void close();

	/**
	 * Sets the status of the registration. The status values are determined
	 * by the individual implementations.
	 *
	 * @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
	 * @param registration the registration to update
	 * @param status the status to set
	 */
	void setStatus(R registration, String status);

	/**
	 * Gets the status of a particular registration.
	 *
	 * @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
	 * @param registration the registration to query
	 * @param <T> the type of the status
	 * @return the status of the registration
	 */
	<T> T getStatus(R registration);
}

这个接口中最重要的方法就是register()服务注册方法和deregister()服务注销方法

可以看到,这个方法是在类org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration中被调用

具体调用的方法在bind()方法中,调用了start()方法,然后在start()中调用了registry()

@EventListener(WebServerInitializedEvent.class)
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(
					((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
		this.start();
	}
public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get()) {
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}

可以看到bind方法上面使用了@EventListener(WebServerInitializedEvent.class)注解,说明服务注册,是在项目启动产生WebServerInitializedEvent 时,向注册中心进行注册。

重新说回ServiceRegistry接口,对于Nacos而言,有一个实现接口NacosServiceRegistry.class

@Override
	public void register(Registration registration) {

		if (StringUtils.isEmpty(registration.getServiceId())) {
			log.warn("No service to register for nacos client...");
			return;
		}

		String serviceId = registration.getServiceId();

		Instance instance = new Instance();
		instance.setIp(registration.getHost());
		instance.setPort(registration.getPort());
		instance.setWeight(nacosDiscoveryProperties.getWeight());
		instance.setClusterName(nacosDiscoveryProperties.getClusterName());
		instance.setMetadata(registration.getMetadata());

		try {
			namingService.registerInstance(serviceId, instance);
			log.info("nacos registry, {} {}:{} register finished", serviceId,
					instance.getIp(), instance.getPort());
		}
		catch (Exception e) {
			log.error("nacos registry, {} register failed...{},", serviceId,
					registration.toString(), e);
		}
	}

这个接口中实现的register()方法,首先定义了一个instance实例,然后通过NameService中的registerInstance()方法注册一个实例。

NameService中的registerInstance()方法的实现逻辑是:

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }

        this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

这个方法的实现逻辑中,第一步就是封装nacos的心跳信息并发送心跳,第二部就是通过NamingProxy中的registerService()来实现注册一个服务实例的代码逻辑。

NamingProxy.registerService()

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
        Map<String, String> params = new HashMap(8);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
        this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
    }

代码逻辑其实就是通过将封装好的参数,路径等通过HttpClient的方式请求到nacos源码中对应的controller层,然后进行业务处理

public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {
        List<String> snapshot = this.serversFromEndpoint;
        if (!CollectionUtils.isEmpty(this.serverList)) {
            snapshot = this.serverList;
        }

        return this.reqAPI(api, params, snapshot, method);
    }
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        } else {
            Exception exception = new Exception();
            if (servers != null && !servers.isEmpty()) {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());

                for(int i = 0; i < servers.size(); ++i) {
                    String server = (String)servers.get(index);

                    try {
                        return this.callServer(api, params, server, method);
                    } catch (NacosException var11) {
                        exception = var11;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
                    } catch (Exception var12) {
                        exception = var12;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
                    }

                    index = (index + 1) % servers.size();
                }

                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            } else {
                int i = 0;

                while(i < 3) {
                    try {
                        return this.callServer(api, params, this.nacosDomain);
                    } catch (Exception var13) {
                        exception = var13;
                        LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
                        ++i;
                    }
                }

                throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            }
        }
    }
public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0L;
        this.checkSignature(params);
        List<String> headers = this.builderHeaders();
        if (!curServer.contains(":")) {
            curServer = curServer + ":" + this.serverPort;
        }

        String url = HttpClient.getPrefix() + curServer + api;
        HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);
        end = System.currentTimeMillis();
        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
        if (200 == result.code) {
            return result.content;
        } else if (304 == result.code) {
            return "";
        } else {
            throw new NacosException(500, "failed to req API:" + HttpClient.getPrefix() + curServer + api + ". code:" + result.code + " msg: " + result.content);
        }
    }

上述的HttpClient服务最终请求到的Url在nacos源码中的InstanceController.class中

@CanDistro
    @PostMapping
    public String register(HttpServletRequest request) throws Exception {

        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

        serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
        return "ok";
    }

在ServerManager中的实现如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

        createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }

        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

首先创建一个服务Service(createEmptyService),然后将该服务实例信息添加到nacos的集群环境中(addInstance)

至此,nacos的服务注册原理就比较清晰了

 

 

 


版权声明:本文为LiaoHongHB原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。