nacos是阿里巴巴开源的一个集注册中心和配置中心为一体的组件,先已加入到apache大家庭中。本文主要介绍nacos作为注册中心中的服务注册原理
首先,所有的服务注册功能,都使用一个相同的接口---ServiceRegistry
package org.springframework.cloud.client.serviceregistry;
/**
* Contract to register and deregister instances with a Service Registry.
*
* @author Spencer Gibb
* @since 1.2.0
*/
public interface ServiceRegistry<R extends Registration> {
/**
* Register the registration. Registrations typically have information about
* instances such as: hostname and port.
* @param registration the registraion
*/
void register(R registration);
/**
* Deregister the registration.
* @param registration
*/
void deregister(R registration);
/**
* Close the ServiceRegistry. This a lifecycle method.
*/
void close();
/**
* Sets the status of the registration. The status values are determined
* by the individual implementations.
*
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
* @param registration the registration to update
* @param status the status to set
*/
void setStatus(R registration, String status);
/**
* Gets the status of a particular registration.
*
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
* @param registration the registration to query
* @param <T> the type of the status
* @return the status of the registration
*/
<T> T getStatus(R registration);
}
这个接口中最重要的方法就是register()服务注册方法和deregister()服务注销方法
可以看到,这个方法是在类org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration中被调用
具体调用的方法在bind()方法中,调用了start()方法,然后在start()中调用了registry()
@EventListener(WebServerInitializedEvent.class)
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(
((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}可以看到bind方法上面使用了@EventListener(WebServerInitializedEvent.class)注解,说明服务注册,是在项目启动产生WebServerInitializedEvent 时,向注册中心进行注册。
重新说回ServiceRegistry接口,对于Nacos而言,有一个实现接口NacosServiceRegistry.class
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setMetadata(registration.getMetadata());
try {
namingService.registerInstance(serviceId, instance);
log.info("nacos registry, {} {}:{} register finished", serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
这个接口中实现的register()方法,首先定义了一个instance实例,然后通过NameService中的registerInstance()方法注册一个实例。
NameService中的registerInstance()方法的实现逻辑是:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}这个方法的实现逻辑中,第一步就是封装nacos的心跳信息并发送心跳,第二部就是通过NamingProxy中的registerService()来实现注册一个服务实例的代码逻辑。
NamingProxy.registerService()
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
Map<String, String> params = new HashMap(8);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}代码逻辑其实就是通过将封装好的参数,路径等通过HttpClient的方式请求到nacos源码中对应的controller层,然后进行业务处理
public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {
List<String> snapshot = this.serversFromEndpoint;
if (!CollectionUtils.isEmpty(this.serverList)) {
snapshot = this.serverList;
}
return this.reqAPI(api, params, snapshot, method);
}public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
throw new IllegalArgumentException("no server available");
} else {
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for(int i = 0; i < servers.size(); ++i) {
String server = (String)servers.get(index);
try {
return this.callServer(api, params, server, method);
} catch (NacosException var11) {
exception = var11;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
} catch (Exception var12) {
exception = var12;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
} else {
int i = 0;
while(i < 3) {
try {
return this.callServer(api, params, this.nacosDomain);
} catch (Exception var13) {
exception = var13;
LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
++i;
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
}
}
}public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0L;
this.checkSignature(params);
List<String> headers = this.builderHeaders();
if (!curServer.contains(":")) {
curServer = curServer + ":" + this.serverPort;
}
String url = HttpClient.getPrefix() + curServer + api;
HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
if (200 == result.code) {
return result.content;
} else if (304 == result.code) {
return "";
} else {
throw new NacosException(500, "failed to req API:" + HttpClient.getPrefix() + curServer + api + ". code:" + result.code + " msg: " + result.content);
}
}
上述的HttpClient服务最终请求到的Url在nacos源码中的InstanceController.class中
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}在ServerManager中的实现如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
首先创建一个服务Service(createEmptyService),然后将该服务实例信息添加到nacos的集群环境中(addInstance)
至此,nacos的服务注册原理就比较清晰了