SpringCloud Nacos作为注册中心-源码解析
写在前面
源码的解读比较枯燥,但是能了解其原理。反复解读能理解他本质的思想,可以下载源码跟着我的顺序来解读。
第一次写文章,望各位大神指点其不足,部分源码比较简单只在外面加了注释没有进行详细解读,有兴趣可以跟下去看。
源码下载
naocs客户端源码 https://github.com/spring-cloud-incubator/spring-cloud-alibaba.
naocs服务端源码 https://github.com/alibaba/nacos.
nacos 官方API 文档 https://nacos.io/en-us/docs/open-api.html.
本文分析以下几个部分
1.客户端注册服务
2.服务端注册服务处理
3.客户端服务发现和调用过程
4.服务端返回客户端请求服务
5.服务心跳 定时
服务注册
spring cloud 是怎么样调用到 nacos的注册服务方法?

1.spring boot 启动后调用到 AnnotationConfigApplicationContext 下的 AnnotationConfigApplicationContext(Class<?>… componentClasses) 构造方法,然后调用refresh方法,refresh下的 finishRefresh()方法是我们调用注册服务的一步
/**
* 这里由于他有父类,所以会先调用父类的构造方法:
* 看源码得知初始化了DefaultListableBeanFactory
*
* 然后才调用自己的构造方法:
* 1.创建一个读取注解的Bean定义读取器
* 将bean读取完后,会调用DefaultListableBeanFactory注册这个bean
* 2.创建BeanDefinition扫描器
* 可以用来扫描包或者类,继而转换为bd
*/
public AnnotationConfigApplicationContext(Class<?>... componentClasses) {
this();
register(componentClasses);
refresh();
}
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
/**
* 准备工作:
* 设置启动时间、是否激活标识位
* 初始化属性源(property source)配置
*/
prepareRefresh();
/**
* 告诉子类刷新内部bean工厂
* 拿到DefaultListableBeanFactory,供后面方法调用
*/
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
/**
* 准备bean工厂
*/
prepareBeanFactory(beanFactory);
try {
/**
* 这个方法在当前版本没有实现
* 可能在spring后面的版本会去扩展
*/
postProcessBeanFactory(beanFactory);
/**
* 在上下文中调用注册为bean的工厂处理器
*
* 添加BeanPostProcessor
* 如果发现loadTimeWeaver的Bean
*/
invokeBeanFactoryPostProcessors(beanFactory);
/**
* 注册BeanPostProcessor
* 自定义以及spring内部的
*/
registerBeanPostProcessors(beanFactory);
/**
* 国际化支持,不关心
*/
initMessageSource();
/**
* 初始化事件监听多路广播器
*/
initApplicationEventMulticaster();
/**
* 这个方法在当前版本没有实现
* 可能在spring后面的版本会去扩展
*/
onRefresh();
/**
* 注册监听器
*/
registerListeners();
/**
* 实例化所有bean
*/
finishBeanFactoryInitialization(beanFactory);
/**
* 发布事件
*/
finishRefresh();
}
......
}
}
2.finishRefresh推送ServletWebServerInitializedEven事件,AbstractAutoServiceRegistration触发监听并执行服务注册
/*
* serverler执行最后的容易刷新,并推送 ServletWebServerInitializedEvent监听事件
*/
protected void finishRefresh() {
super.finishRefresh();
WebServer webServer = startWebServer();
if (webServer != null) {
publishEvent(new ServletWebServerInitializedEvent(webServer, this));
}
}
/*
* 监听器监听到 WebServerInitializedEvent 执行bind方法
*/
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
/*
* 1.推送实例注册前事件
* 2.推送注册前瑟吉欧款
* 3.执行注册后事件
* 4.重置启动状态
*/
public void start() {
/*注册的开关是否打开,如果只订阅是不需要打开的,这里取的是NacosDiscoveryProperties的参数*/
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
/*容器未启动,即第一次启动的时候*/
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
/*注册服务,这里是调用NacosAutoServiceRegistration*/
register();
/*是否需要将管理服务的服务注册到注册中心,nacos的话是null,这里可以忽略*/
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
/*启动状态更新*/
this.running.compareAndSet(false, true);
}
}
3.调用到NacosAutoServiceRegistration的register() 方法
客户端是怎么将服务注册到Nacos上的
承接上面的调用链>>>>>>>
1.NacosAutoServiceRegistration执行register()方法,然后调用父类 AbstractAutoServiceRegistration register()方法
protected void register() {
/*如果未开启注册到nacos,直接返回*/
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
log.debug("Registration disabled.");
return;
}
/*参数配置的端口小于0),获取服务端口,所以不需要去主动配置这个参数,可以根据服务的端口字段获取*/
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
/*调用父类注册方法*/
super.register();
}
protected void register() {
this.serviceRegistry.register(getRegistration());
}
2.NacosNamingService执行registerInstance方法
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
/*临时实例*/
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
/*注册服务*/
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
3.NamingProxy执行registerService方法,调用reqAPI进行http请求"/nacos/v1/ns/instance"
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
/*拼接调用参数*/
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
/*发送服务注册请求*/
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {
//获取命名空间id
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
//注册服务不能为空
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
//发送服务端请求
return callServer(api, params, body, server, method);
······
public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
//注入秘钥信息,如果使用阿里云的ACM
injectSecurityInfo(params);
//拼接请求头
List<String> headers = builderHeaders();
//拼接请求url
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
}
//发送http请求
HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
//加入监视器
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
.observe(end - start);
//200
if (HttpURLConnection.HTTP_OK == result.code) {
return result.content;
}
//304
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return StringUtils.EMPTY;
}
throw new NacosException(result.code, result.content);
}
服务端是接受注册请求是怎么处理的?
服务端源码目录:
服务注册,根据客户端请求可以发现,注册请求有2个:
心跳服务 = /nacos/v1/ns/instance/beat
发起注册请求 = /nacos/v1/ns/instance
1.服务注册/nacos/v1/ns/instance
1.1调用服务注册接口,初始化参数,调用实例注册
/**
* 1.根据请求带的参数,创建实例赋值,并校验
* 2.和心跳请求一样,调用了注册实例
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
/*获取请求中的服务名和命名空间*/
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
/*根据请求带的参数,创建实例赋值,并校验*/
final Instance instance = parseInstance(request);
/*和心跳请求一样,调用了注册实例*/
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
1.2如果实例不存在,创建实例并注册实例,将实例加入缓存并加入到通知定时队列
/**
* 1.校验并创建服务,加入缓存,并加入到健康检测的定时线程中
* 2.再次获取服务,如果不存在直接抛出异常
* 3.实例加入到缓存中,并返回所有实例 ,.将实例加入通知定时队列,并将任务加入定时线程中
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
/*1.校验并创建服务 2.将服务加入到本地缓存中 3.服务加入到健康监测定时线程中*/
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
/*再次获取服务,如果不存在直接抛出异常*/
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
/*1.实例加入到缓存中,并返回所有实例 2.将实例加入通知定时队列,并将任务加入定时线程中 */
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
如果服务不存在,校验并创建服务,将服务加入到本地缓存,并加入到健康监测定时线程中
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
/*集群不为空,把服务加入到集群中,并更新缓存*/
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
/*服务名 集群名校验*/
service.validate();
/*1.服务加入缓存 2.服务初始化入定时线程,并进行监听*/
putServiceAndInit(service);
/*如果不是短期服务,添加或替换当前服务*/
if (!local) {
addOrReplaceService(service);
}
}
}
添加服务到本地缓存,初始化服务,将服务加入到健康检查定时线程,初始化集群,将集群加入到健康检查定时线程
/**
* 添加服务到本地缓存,初始化服务,将服务加入到健康检查定时线程,初始化集群,将集群加入到健康检查定时线程
* @param service
* @throws NacosException
*/
private void putServiceAndInit(Service service) throws NacosException {
/*将服务加入到本地缓存中*/
putService(service);
/*初始化服务,初始化集群*/
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
初始化服务
/**
* 初始化服务
*/
public void init() {
/*加入心跳检测定时任务*/
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
/*集群健康检查初始化*/
entry.getValue().init();
}
}
1.3.将实例加入服务中,并加入实例缓存
/**
* 将实例添加到缓存中
*/
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
/*实例加入到缓存中*/
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
/*加入到阻塞队列中,并加入定时任务队列*/
consistencyService.put(key, instances);
}
}
2.心跳服务 = /nacos/v1/ns/instance/beat
2.1.调用服务端创建心跳接口
/**
* 1.从缓存中获取对应ip 端口的实例
* 2.如果实例不存在,注册实例
* 3.初始化实例的健康状态为true ,最后执行的心跳时间为当前时间,触发服务更新的监听
* 4.将 心跳周期 ,执行状态返回
*/
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
/*心跳周期*/
result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
/*心跳值获取,没有则取默认值空,不为空转化为对象,通过客户端源码可以看到
我们有传beatInfo的json字符串*/
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
/*获取集群值,没有则取默认值*/
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
/*获取ip,端口值,没有则取默认值.如果beat不为空则从beat中取*/
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
/*serviceName namespaceId 获取*/
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
/*3.1.1从缓存中获取对应ip 端口的实例*/
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
/*如果实例不存在*/
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
/*3.1.2 注册实例*/
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
/*再次获取服务*/
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
/*心跳对象为空,则重新生成*/
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
/*初始化实例的健康状态为true ,最后执行的心跳时间为当前时间,触发服务更新的监听*/
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
/*轻量级心跳状态*/
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
2.2注册实例看上面注册实例的接口解释
2.3ClientBeatCheckTask 下执行定时任务,
1.循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳间隔时间,将健康状
态设置为false
2.循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳删除时间,将实例删除
/**
* 1. 循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳间隔时间,将健康状
* 态设置为false
* 2. 循环所有临时状态的实例,如果实例离上次执行心跳间隔超出了设置的心跳删除时间,将实例删除
/
public void run() {
try {
/*task中没有该服务名*/
if (!getDistroMapper().responsible(service.getName())) {
return;
}
/*不进行健康检测*/
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
/*所有临时状态的实例*/
List<Instance> instances = service.allIPs(true);
for (Instance instance : instances) {
/*离上次执行心跳间隔超出了心跳间隔时间*/
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
/*健康状态设置为不健康*/
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
/*超时实例如果为false(默认为true),直接返回*/
if (!getGlobalConfig().isExpireInstance()) {
return;
}
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
/*离上次执行心跳间隔超过了,删除的最大时间*/
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
/*将实例删除*/
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
服务发现
什么时候去获取到Nacos上的服务?
首先在启动,在启动日志中可以看到,BaseLoadBalancer接口会进行服务发现,也就是在使用ribbon的时候调用到,然后我们跟踪下调用链
打上断点后,查看调用链,发现是在创建feign bean的时候,会进行服务发现,最终调用到 com.alibaba.cloud.nacos.ribbon.NacosServerList
客户端是怎么获取到对应的服务地址的?

1.NacosServerList 根据服务名和group获取所有服务
private List<NacosServer> getServers() {
try {
/*discoveryProperties 为 ${spring.cloud.nacos.discovery} 下的属性*/
String group = discoveryProperties.getGroup();
/*根据nacos属性创建namingService ,调用实例发现*/
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
/*将返回的instance 对象转化为 NacosServer 对象*/
return instancesToServerList(instances);
}
}
2.NamingService 创建
public NamingService namingServiceInstance() {
if (null != namingService) {
return namingService;
}
try {
/*获取nacos属性,并创建NamingService*/
namingService = NacosFactory.createNamingService(getNacosProperties());
}
catch (Exception e) {
log.error("create naming service error!properties={},e=,", this, e);
return null;
}
return namingService;
}
3.根据 serviceId group 查询所有的实例
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
/*如果是订阅者*/
if (subscribe) {
/*根据group name 和 集群名获取服务*/
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
/*无论实例是否健康,直接调用三方去获取*/
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
/*根据healthy状态过滤所有实例*/
return selectInstances(serviceInfo, healthy);
}
4.从服务端拉取实例并缓存到本地
/**
* 1.从缓存中获取服务
* 2.如果 本地缓存不存在,新建服务,从nacos服务端拉取服务更新到本地缓存
*3.加入定时线程中,定时去服务端同步服务的实例情况
*/
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
/*故障转移是否打开*/
if (failoverReactor.isFailoverSwitch()) {
/*先从缓存中获取,没有则新建一个服务对象返回*/
return failoverReactor.getService(key);
}
/*从缓存中获取获取serviceInfo*/
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
/*本地缓存中不存在该服务*/
if (null == serviceObj) {
/*新建serviceInfo 对象,并加入本地缓存*/
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
/*加入到正在更新map中作为记录*/
updatingMap.put(serviceName, new Object());
/*更新服务,从nacos服务端拉取服务更新到本地缓存*/
updateServiceNow(serviceName, clusters);
/*从更新中map移除*/
updatingMap.remove(serviceName);
}
/*服务正处于更新中,对该线程上锁,等待服务更新完成通知,或者5秒后自动释放*/
else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
/*加入定时线程中,定时去服务端同步服务的实例情况*/
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
4.1 从Nocos 服务端查询实例更新到本地缓存
public void updateServiceNow(String serviceName, String clusters) {
/*尝试去拿到缓存中服务*/
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
/*从nacos上查询服务*/
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
/*根据查询到的结果,更新本地缓存中服务信息*/
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
/*将oldService对象wait的地方全部通知释放(在updatingMap有该服务key的时候通知)*/
oldService.notifyAll();
......
拼接参数调用服务端
/**
* serviceName 服务名
* clusters 集群列表
* 本地udp端口
* healthyOnly 健康状态
**/
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
/*调用服务器查询(/nacos/v1/ns/instance/list)*/
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
4.2加入定时线程中,定时去服务端同步服务的实例情况
/**
* futureMap 存放 ScheduledFuture 类型的对象,即存放的是定时线程
**/
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
/*双重校验,如果已经存在定时则直接返回*/
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
/*加入定时更新服务的任务*/
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
定时任务执行
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
@Override
public void run() {
try {
/*缓存中获取对应的服务*/
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
/*缓存不存在服务*/
if (serviceObj == null) {
/*请求nacos服务端并更新本地缓存*/
updateServiceNow(serviceName, clusters);
/*加入定时线程,默认间隔1秒钟*/
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
/*该服务执行的时间 小于等于 上一次记录的执行时间的时候*/
if (serviceObj.getLastRefTime() <= lastRefTime) {
/*请求nacos服务端并更新本地缓存*/
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
/*只查询不更新*/
refreshOnly(serviceName, clusters);
}
/*将记录执行时间更新为下次服务执行的时间*/
lastRefTime = serviceObj.getLastRefTime();
/*如果订阅线程已经终止*/
if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task:
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
/*将当前对象继续加入定时线程,下次执行时间为服务设置的时间,一般5s钟*/
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
......
服务端是怎么返回请求对应的服务信息?
官方文档
请求地址:/nacos/v1/ns/instance/list
客户端调用服务端的请求,从请求中获取参数,然后根据条件获取实例和其他服务信息,封装返回
/**
* 获取所有的实例
*/
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
//命名空间
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//服务名
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//请求agent
String agent = WebUtils.getUserAgent(request);
//客户端集群
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
//客户端ip
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
//dup端口
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
//环境参数
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
//app参数
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
//健康状态
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
/*根据条件获取服务信息*/
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
从缓存中获取服务,根据负载均衡算法获取所有的实例。根据健康状态和实例的可用状态封装服务信息返回
/**
* 1.获取服务,如果获取不到服务则返回空 hosts
* 2.根据负载均衡算法获取,提供者实例
* 3.过滤出可用的实例,封装返回所有hosts
*/
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
/*从缓存中获取服务*/
Service service = serviceManager.getService(namespaceId, serviceName);
/*如果服务为空,返回hosts为空*/
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
/*校验服务是否可用*/
checkIfDisabled(service);
/*缓存时间,默认3秒*/
long cacheMillis = switchDomain.getDefaultCacheMillis();
/*添加对应服务的pushClient*/
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
/*缓存时间取 defaultPushCacheMillis 默认为10s*/
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
List<Instance> srvedIPs;
/*获取集群下的所有实例*/
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
/*负载均衡算法不为空的话,根据客户端ip获取服务提供者*/
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
/*如果没有实例,返回空hosts 的结果*/
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("hosts", JacksonUtils.createEmptyArrayNode());
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
/*区分健康状态true和false的实例*/
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold", false);
}
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return JacksonUtils.createEmptyJsonNode();
}
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
/**
* 将实例封装后添加到hosts列表中返回
*/
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
/*过滤不健康的实例*/
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
/*过滤不可用的实例*/
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.put("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
feign是怎样调用到服务端的?
feign类,会在初始化为bean的时候生成代理类,最终在调用的时候都会调用到SynchronousMethodHandler的invoke()方法
1.SynchronousMethodHandler 执行invoke方法,调用client.execute(request, options)
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
/*执行*/
return executeAndDecode(template, options)
......
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
/*客户端调用执行*/
response = client.execute(request, options);
......
2.调用到ribbon包下LoadBalancerFeignClient 下execute实现类,根据feign的clientName 执行对应的请求
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
/*工厂方法创建对应的客户端,然后执行负载均衡调用方法*/
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
3.调用到AbstractLoadBalancerAwareClient下的executeWithLoadBalancer,根据负载均衡算法,选择一个host进行调用,并将结果返回
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
/*根据请求的配置,然后从所有服务中根据负载均衡算法,组装成对应的command命令*/
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
/*调用服务端,返回对应的结果*/
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
......
总结
1.客户端组装自己的服务信息,然后向服务端发起注册请求
2.服务端接收到注册请求后,将服务信息加入到本地缓存,并且加入定时,不断的检测服务是否健康,更新或删除注册的服务
3.客户端会在启动时,spring创建feign bean的时候会去从Nacos服务端获取service信息,并且加入定时,不断的拉取feign对应模块的服务信息
4.客户端在通过feign调用的时候,会通过负载均衡算法,从本地缓存中选择一个服务进行调用
5.服务端接收到发现请求时,会根据条件从服务端本地缓存中获取对应的实例,封装好返回给客户端
