1. nacos服务注册与发现源码深度剖析
2. nacos注册表如何防止多节点读写并发冲突
3. nacos高并发支撑异步任务与内存队列剖析
1. nacos服务注册
nacos客户端 :
springboot自动装配类
进入这个类查看
进入new方法
nacos客户端最终调用服务端方法, 进行服务注册, 接下来看服务端逻辑
nacos服务端:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//获取namespaceId
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//serviceName
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//构建instance
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
//对instance进行注册
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
进入registerInstance方法
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//判断service是否为空, 如果不是空就返回, 如果为空则创建一个service对象,(第一次进来为空)
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//获取刚才创建的service
Service service = getService(namespaceId, serviceName);
//对service再一次进行校验, 判断是否为空, 为空则抛异常
checkServiceIsNull(service, namespaceId, serviceName);
//对instance进行注册
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
进入createEmptyService方法
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
//设置serviceName名
service.setName(serviceName);
//设置命名空间
service.setNamespaceId(namespaceId);
//设置组名
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
进入getService方法
public Service getService(String namespaceId, String serviceName) {
/**
* Map(namespace, Map(group::serviceName, Service)).
* nacos最关键的一个map
*/
//private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
}
退回来, 进入addInstance方法
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//根据ephemeral变量生成一个字符串, ephemeral默认为true, 字符串中包含ephemeral
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//获取刚才生成的service
Service service = getService(namespaceId, serviceName);
synchronized (service) {
//将ips进行封装, 生成Instance集合
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//将实例放入到service中
consistencyService.put(key, instances);
}
}
进入consistencyService.put方法 (DelegateConsistencyServiceImpl类)
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
进入mapConsistencyService(key)方法
private ConsistencyService mapConsistencyService(String key) {
//根据刚才生成的key, 获取对象, 默认会获取第一个
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
进入mapConsistencyService(key).put(key, value);方法, 最终进入到DistroConsistencyServiceImpl类的put方法
@Override
public void put(String key, Record value) throws NacosException {
//主要方法, 将instance放入到service中
onPut(key, value);
// If upgrade to 2.0.X, do not sync for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
DistroConfig.getInstance().getSyncDelayMillis());
}
进入onPut()方法
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
进入到addTask()方法
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
走到offer方法, instance注册完成
大家可能会好奇, 在哪里将实例注册的?
其实是通过阻塞队列 + 线程池的方式注册的
DistroConsistencyServiceImpl类中的init方法启动线程
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
看代码
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
//入阻塞队列
tasks.offer(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
//无限循环
for (; ; ) {
try {
//出阻塞队列
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
//主要方法
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
//走这里
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
进入listener.onChange方法(Service类)
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//走这里
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
进入updateIPs方法
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
//主要走下面这个方法
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);
ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
进入clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);方法
public void updateIps(List<Instance> ips, boolean ephemeral) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());
if (updatedIps.size() > 0) {
for (Instance ip : updatedIps) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP, ip);
}
}
}
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs);
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs);
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
ephemeralInstances变量是Cluster类中的一个set属性, 里面装的就是全部实例,
ephemeralInstances = toUpdateInstances; 最终将toUpdateInstances赋值给ephemeralInstances,将全部的实例放入到Cluster中的一个set属性中, 实例注册完成
namespace -> group -> service -> cluster -> set<Intance> 最终回去全部的属性
版权声明:本文为hc1428090104原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。