第一部分 Nacos Config自动装配
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigAutoConfiguration {
@Bean
public NacosConfigProperties nacosConfigProperties(ApplicationContext context) {
if (context.getParent() != null
&& BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
context.getParent(), NacosConfigProperties.class).length > 0) {
return BeanFactoryUtils.beanOfTypeIncludingAncestors(context.getParent(),
NacosConfigProperties.class);
}
return new NacosConfigProperties();
}
@Bean
public NacosRefreshProperties nacosRefreshProperties() {
return new NacosRefreshProperties();
}
@Bean
public NacosRefreshHistory nacosRefreshHistory() {
return new NacosRefreshHistory();
}
@Bean
public NacosConfigManager nacosConfigManager(
NacosConfigProperties nacosConfigProperties) {
return new NacosConfigManager(nacosConfigProperties);
}
@Bean
public NacosContextRefresher nacosContextRefresher(
NacosConfigManager nacosConfigManager,
NacosRefreshHistory nacosRefreshHistory) {
// Consider that it is not necessary to be compatible with the previous
// configuration
// and use the new configuration if necessary.
return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
}
}
NacosConfigService创建了五个Bean
NacosConfigProperties 配置属性类
NacosRefreshProperties 刷新属性类
NacosRefreshHistory 刷新历史类
NacosConfigManager 配置管理器类
NacosContextRefresher Nacos上下文刷新器
主要看一下NacosConfigManager
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
// Compatible with older code in NacosConfigProperties,It will be deleted in the
// future.
createConfigService(nacosConfigProperties);
}
/**
* Compatible with old design,It will be perfected in the future.
*/
static ConfigService createConfigService(
NacosConfigProperties nacosConfigProperties) {
if (Objects.isNull(service)) {
synchronized (NacosConfigManager.class) {
try {
if (Objects.isNull(service)) {
service = NacosFactory.createConfigService(
nacosConfigProperties.assembleConfigServiceProperties());
}
}
catch (NacosException e) {
log.error(e.getMessage());
throw new NacosConnectionFailureException(
nacosConfigProperties.getServerAddr(), e.getMessage(), e);
}
}
}
return service;
}
主要是判断是否存在NacosConfigService如果不存在,则通过NacosFactory创建
参照NacosNamingService,NacosConfigService也是通过反射创建。
接下来看看NacosConfigService的创建流程、关键类及其作用
第二部分 NacosConfigService创建
NacosConfigService构造方法
public NacosConfigService(Properties properties) throws NacosException {
//验证参数
ValidatorUtils.checkInitParam(properties);
//初始化namespace
initNamespace(properties);
//配置过滤链管理器
this.configFilterChainManager = new ConfigFilterChainManager(properties);
//创建服务列表管理器并启动 参考NacosNamingService
ServerListManager serverListManager = new ServerListManager(properties);
serverListManager.start();
//创建客户端worker
this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);
// will be deleted in 2.0 later versions
//服务http代理
agent = new ServerHttpAgent(serverListManager);
}
主要创建了几个类:
ConfigFilterChainManager
ServerListManager
ClientWork
ServerHttpAgent
接下来分析各个类的作用:
1.ConfigFilterChainManager 构造方法
public ConfigFilterChainManager(Properties properties) {
ServiceLoader<IConfigFilter> configFilters = ServiceLoader.load(IConfigFilter.class);
for (IConfigFilter configFilter : configFilters) {
configFilter.init(properties);
addFilter(configFilter);
}
}
首先,用ServiceLoader加载了IConfigFilter.class,找到META-INF.service目录找到IConfigFilter全路径名称的文件,文件指定了类:
com.alibaba.nacos.client.config.filter.impl.ConfigEncryptionFilter
然后遍历所加载的configFilter 进行初始化init,并添加filter
ConfigEncryptionFilter 的init方法为空,而addFilter方法主要是将filter添加到filters集合中
,ConfigFilterChainManager是通过VirtualFilterChain的doFilter方法,循环filters调用doFilter方法
具体看一下ConfigEncryptionFilter的doFilter方法
public void doFilter(IConfigRequest request, IConfigResponse response, IConfigFilterChain filterChain)
throws NacosException {
if (Objects.nonNull(request) && request instanceof ConfigRequest && Objects.isNull(response)) {
//加密过程
// Publish configuration, encrypt
ConfigRequest configRequest = (ConfigRequest) request;
String dataId = configRequest.getDataId();
String content = configRequest.getContent();
Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);
String secretKey = pair.getFirst();
String encryptContent = pair.getSecond();
((ConfigRequest) request).setContent(encryptContent);
((ConfigRequest) request).setEncryptedDataKey(secretKey);
}
if (Objects.nonNull(response) && response instanceof ConfigResponse && Objects.isNull(request)) {
//解密过程
// Get configuration, decrypt
ConfigResponse configResponse = (ConfigResponse) response;
String dataId = configResponse.getDataId();
String encryptedDataKey = configResponse.getEncryptedDataKey();
String content = configResponse.getContent();
Pair<String, String> pair = EncryptionHandler.decryptHandler(dataId, encryptedDataKey, content);
String decryptContent = pair.getSecond();
((ConfigResponse) response).setContent(decryptContent);
}
filterChain.doFilter(request, response);
}
EncryptionHandler中加密解密中会先做验证,checkCipher(dataId),判断dataId即配置名称是否以 cipher- 开头,如果不是以cipher-开头,直接返回明文
EncryptionHandler最终是通过EncryptionPluginService来完成加解密的、所以需要自己实现EncryptionPluginService
2.ServerListManager在NacosNamingService中已经分析过,主要是定时任务从nacos server获取服务的列表并更新
3.ClientWork
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
final Properties properties) throws NacosException {
this.configFilterChainManager = configFilterChainManager;
init(properties);
agent = new ConfigRpcTransportClient(properties, serverListManager);
int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
return t;
});
agent.setExecutor(executorService);
agent.start();
}
ClientWork构造方法主要创建ConfigRpcTransportClient并设置线程池并启动
ConfigRpgTransportClient中start方法
public void start() throws NacosException {
securityProxy.login(this.properties);
this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,
this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
startInternal();
}
进入startInternal方法
public void startInternal() {
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (executor.isShutdown() || executor.isTerminated()) {
continue;
}
executeConfigListen();
} catch (Exception e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
主要是执行executeConfigListen方法
public void executeConfigListen() {
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
long now = System.currentTimeMillis();
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
//循环1开始:
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
//1.验证本地监听器是否一致
//check local listeners consistent.
if (cache.isSyncWithServer()) {
//验证监听器MD5是否一致,如果则不一致则需要调用CacheData的safeNotifyListener方法,用最新的MD5加密并接收ConfigChange
//TODO saftNotifyListener稍后待详细分析
cache.checkListenerMd5();
if (!needAllSync) {
continue;
}
}
if (!CollectionUtils.isEmpty(cache.getListeners())) {
//get listen config
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<>();
listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
} else if (CollectionUtils.isEmpty(cache.getListeners())) {
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<>();
removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
}
}
}
//循环1结束
//循环2开始:
boolean hasChangedKeys = false;
if (!listenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
List<CacheData> listenCaches = entry.getValue();
for (CacheData cacheData : listenCaches) {
timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
cacheData.getLastModifiedTs().longValue());
}
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
//handle changed keys,notify listener
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
hasChangedKeys = true;
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
.getChangedConfigs()) {
String changeKey = GroupKey
.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
refreshContentAndCheck(changeKey, !isInitializing);
}
}
//handler content configs
for (CacheData cacheData : listenCaches) {
String groupKey = GroupKey
.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
if (!changeKeys.contains(groupKey)) {
//sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
synchronized (cacheData) {
if (!cacheData.getListeners().isEmpty()) {
Long previousTimesStamp = timestampMap.get(groupKey);
if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,
System.currentTimeMillis())) {
continue;
}
cacheData.setSyncWithServer(true);
}
}
}
cacheData.setInitializing(false);
}
}
} catch (Exception e) {
LOGGER.error("Async listen config change error ", e);
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
}
//循环2结束:
//循环3开始:
if (!removeListenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> removeListenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
configChangeListenRequest.setListen(false);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) {
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
}
}
}
} catch (Exception e) {
LOGGER.error("async remove listen config change error ", e);
}
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
//循环3结束
if (needAllSync) {
lastAllSyncTime = now;
}
//If has changed keys,notify re sync md5.
if (hasChangedKeys) {
notifyListenConfig();
}
}
executeConfigListen方法较长,方法开始初始化了几个CacheMap:
listenCachesMap
removeListenCacheMap
listenCachesMap主要是监听器列表映射map
removeListenCachesMap 主要是移除监听器的列表映射
另外还有一个成员变量cacheMap 声明类型是AtomicReference<Map<String, CacheData>>
由于内容比较多,可以按代码块来分析,根据代码,主要 有三次遍历集合
1.for (CacheData cache : cacheMap.get().values()) {...}
2.for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {...}
3.for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()){...}
循环1:
1.循环cacheMap,并检查cacheMap中CacheData是否同步server,md是否一致,如果md不一致,则需要用最新的md5来接收配置变更
2.循环listenCachesMap,如果有listener且未加入listenCachesMap中,则按照taskId,listener列表按照映射放入listenCachesMap中
3.循环removeListenCachesMap,如果有listener且未加入listenCachesMap中,则按照taskId,listener列表按照映射放入removeListenCachesMap中
循环2:
1.循环并生成KeyTenant与上次修改的时间戳分别以key,value形式放入timestampMap中
2.生成请求监听上下文,获得rpcClient,并发送请求
3.如果配置变更,则生成changeKey,如果未初始化,则执行refreshContentAndCheck,refreshContentAndCheck最终会执行到checkListenerMd5方法,第一个循环中遍历cacheMap是同样会执行该方法。稍后会对该方法详细分析。
4.遍历listenCaches,生成groupKey,修改最后更新时间戳、设置是否初始化字段
循环3:
1.循环3类似循环2,但是对移除监听的处理,需要构建移除监听请求的上下文,获取rpcClient,请发送请求
2.如果响应成功,则ClientWork会执行removeCache方法,将cacheMap根据groupKey移除,更新MetricsMonitor中监听器数量
三次循环完成后,会更新最后更新事件字段,如果配置有更新,则执行notifyListenConfig根据注释,如果监听器有更新,需要重新同步md5
接下来,看一下CacheData的checkListenerMd5方法
//注:遍历listeners,如果上次调用md5与成员变量md5不同,则执行safeNotifyListener
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
safeNotifyListener方法:
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
if (listenerWrap.inNotifying) {
LOGGER.warn(
"[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
name, dataId, group, md5, listener);
return;
}
Runnable job = () -> {
long start = System.currentTimeMillis();
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// Before executing the callback, set the thread classloader to the classloader of
// the specific webapp to avoid exceptions or misuses when calling the spi interface in
// the callback method (this problem occurs only in multi-application deployment).
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listenerWrap.inNotifying = true;
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId,
group, md5, listener, (System.currentTimeMillis() - start));
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
try {
INTERNAL_NOTIFIER.submit(job);
} catch (RejectedExecutionException rejectedExecutionException) {
LOGGER.warn(
"[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",
name, dataId, group, md5, listener);
job.run();
} catch (Throwable throwable) {
LOGGER.error(
"[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",
name, dataId, group, md5, listener, throwable);
job.run();
}
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
group, md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
该方法主要创建了一个job,并执行job
job类加载部分可以先省略掉,可以看到有两个Listener
1.AbstractSharedListener
2.AbstractConfigChangeListener
AbstractSharedListener会设置两个字段,dataId和group,该抽象类有一个抽象方法 innerReceive,如果设置共享监听器,则需要继承该监听器,并实现innerReceive方法
AbstractConfigChangeListener 包含receiveConfigChange抽象方法,如果要监听配置变更,需要继承该监听器并实现receiveConfigChange方法
由此,可以看出,nacos配置变更是可以通过继承配置变更监听器来获取相关变更信息的,动态获取最新的配置,无需重启服务,更加灵活。