说道源码解析,那就不得不把源码从git上下载下来,所以想要深入的童鞋一定要把源码给下载下来。
那么接下来就直接开始吧!
那么从哪开始呢?我们日常使用当然是启动nacos,然后在一个springboot配置中添加nacos服务发现的一个jar包,我们项目用的是2.2.3版本的
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
那么之后呢? 如果对springboot自动配置原理有些了解的小伙伴会知道,springboot在启动的时候会扫描jar包中META-INF/spring.factories这个文件,如下是nacos-discovery jar包中的spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration从上述我们看到这个jar包配置了挺多的自动配置类,有什么ribbon,nacos的也很多,怎么看呢?这里凭借经验或者理解来说是这个NacosServiceRegistryAutoConfiguration类,nacos服务注册自动配置,那么我们点进去
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}可以看到这里注册了三个bean,这三个bean有什么关系呢,我们逐一点击去看下继承关系中可以得到如下结果

这里稍微解释,继承了applicationListener接口需要实现onApplicationEvent这个方法,他会监听对应的事件,这里我把WebServerInitializedEvent的注释带上了,意思就是在事件在web服务准备好了就会被发布。
顺着我图中的顺序,我们来到NacosServiceRegistry.register(Registration registration)方法
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//从方法名称来看是注册实例,我们点进去
namingService.registerInstance(serviceId, group, instance);
}
catch (Exception e) {
rethrowRuntimeException(e);
}接下来我们来到
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//判断是不是临时实例
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}这里的临时实例稍作解释,在nacos中我们会区分临时实例和持久化实例,而他们分别对应着ap架构和cp架构,我们公司用的是ap架构。通过beat这个词,我们大致可以推断出,这是个心跳机制的东东,我们先略过这个if走主线,往下就是又是一个注册服务,点进去。
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
...
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}这个方法看起来就是拼接了参数,然后通过reqApi去调用,我们看下UtilAndComs.nacosUrlInstance = "/nacos" + "/v1/ns" + "/instance",从官网https://nacos.io/zh-cn/docs/open-api.html我们可以看到

这就是nacos官网上开放的openapi,我们继续往下走就是通过http的方式拼接了地址和端口号,调用nacos server端了,那么接下来就是核心啦,我们需要借用nacos源码,然后去看服务端的逻辑了。
来到服务端,通常我们用全局搜索这个路径,发现没找到,怎么办,最简单的就是全局搜Controller,不多说,我们直接来到

注意:我这里的nacos版本是基于生产可使用的,也就是1.1.4,并非2.x版本(有bug不稳定)
来到InstanceController
@PostMapping
public String register(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}
点进去
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建一个空实例
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
//添加实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
这个是createEmptyService的一个主要流程,我们主要是看下最后的一个serviceMap.put()
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}这里采用双重判断来保证服务注册的安全性,当这个服务没有任何一个实例的时候,就新建,不然就get出来在put,所以这个map的结构也就是
Map<namespace, Map<group::serviceName, Service>>
而这个Map也我们的服务注册的列表了。那么我们回到putServiceAndInit方法
private void putServiceAndInit(Service service) throws NacosException {
//创建内存注册表结构
putService(service);
//服务初始化
service.init();
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
}在内存中创建结构后,还进行了服务初始化,我们点进去看下做了什么
public void init() {
//健康任务的检查
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
//集群同步的更新
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}点进去会发现,有个只是把这个任务放到了一个map当中,其实这是我觉得nacos快的一个比较重要的原因之一,异步调用,这里提一句,笔者之前公司使用的是eureka,在k8s环境发布后,由于eureka的三层map缓存和ribbon等一些的延迟,会导致服务出现在注册中心列表中需要30s 到2min不等,而nacos基本可以保证在10s左右,我们阅读源码其实并不一定需要把很多细节搞清楚,最重要的笔者认为是思想,好了不扯了,继续。
clientBeatCheckTask是一个实现了Runnable接口的,那么我们可以直接去看它的run方法。
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
//翻译过来就是有限设置实例的健康状态
for (Instance instance : instances) {
//判断当前时间到上一次心跳时间是否超过配置值,默认15s
//public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//添加到map中,里面也是异步执行的逻辑,springcontext发布事件也是一种解耦的操作
getPushService().serviceChanged(service);
//发布时间
SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
//然后移除一些过期的实例
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
//删除实例,又是用的异步
deleteIP(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}这个代码有些长,我也就都加上了注释,就是当前时间和上一次的心跳时间,其实就是最开始说的那个if判断是否是临时实例,那里面会走心跳链接的逻辑,而时间如果没有配置,会有个默认的15s,如果过了就会把实例的健康状态置为false,删除实例也是一样的,其实就是http调用,同样也是采用异步的方式,你就会发现nacos的太多操作是基于异步的了,而异步最大的好处就是速度快。
那么既然说到了心跳,那么我们就不得不退回到最开始的if条件了,其实读者会发现,在源码中很多情况你可以先不要看特殊逻辑,先去走主流程,对于特殊逻辑有个印象就好了,当后续用到的时候再折回头来。继续!
我们切回客户端
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//判断是不是临时实例
if (instance.isEphemeral()) {
//我们回到这里
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}buildBeatInfo方法仅仅只收构建了一下参数,然后下面一行代码我们看一下
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
...
//又是个定时器 默认就是5s public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}这里就可以算出 15s / 5s = 3次,即三次重试失败实例的健康状态才会被置为false,联系上下文!
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
//发送心跳
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
//如果实例不存在,就回去注册
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
//循环的定时任务
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);那其实发送心跳也就是个http链接了
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
...
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}那接下来不用我说了吧,我们直接来到server端
@PutMapping("/beat")
public JSONObject beat(HttpServletRequest request) throws Exception {
...
//是持久化实例么
if (!switchDomain.isDefaultInstanceEphemeral() && !clientBeat.isEphemeral()) {
return result;
}
//设置集群
if (StringUtils.isBlank(clientBeat.getCluster())) {
clientBeat.setCluster(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
String clusterName = clientBeat.getCluster();
//获取实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(),
clientBeat.getIp(),
clientBeat.getPort());
//如果实例不存在,则重新注册
if (instance == null) {
instance = new Instance();
...
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
//心跳
service.processClientBeat(clientBeat);
result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
return result;
}我们直接来到心跳的方法,还是异步
public static ScheduledFuture<?> scheduleNow(Runnable task) {
//delay = 0 立即开启
return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
}那么直接来到clientBeat的run方法了
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
//设置上次的心跳时间 有没有之前的注册的那个联系到一块了
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
//如果实例健康状态为false,置为true,我们之前不是看到15s会置位false,30s才会删除实例
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}那么心跳连接也就到此结束了。那么就此我们都串联起来了,你也为这完了? 当初我们点的方法时createEmptyService,既然是empty,那肯定有下文呀,我们就开个小头
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//还有这
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}继续看看后续做了什么
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
}就只是一个put就结束了?这又是一个map,其实不是

在DelegateConsistencyServiceImpl类中
private ConsistencyService mapConsistencyService(String key) {
//通过key来区分临时实例和持久化实例
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}那么本文就到这了,下篇文章,就是我们一起来分析分析持久化实例和临时实例,nacos是怎么样做的,也是是nacos的ap和cp架构咯。
半年没写博文了,主要是写博文真的累,还怕自己理解有问题误导了读者,但是想想一肚子知识还是得分享出来才行,还是那句话,我说的不一定对,多思考多实践才是真理,好了下次见。