序言
从jetCache 中,查看几个特色功能,
例如 loader ,当cache 没有数据时候,怎么进行loader 取出数据
refreshCache ,怎么定时刷新cache,当然是从loader cache 中取
怎么合理利用proxy 模式组合
怎么进行moniter 事件通知
这几个专题可能需要多篇来解释
正文
我们还是从Cache 接口中看起
/**
* Get the config of this cache.
* @return the cache config
*/
CacheConfig<K, V> config(); |
其中看到一个CacheConfig 我们进去查看其内容
public class CacheConfig<K, V> implements Cloneable {
private long expireAfterWriteInMillis = CacheConsts.DEFAULT_EXPIRE * 1000L;
private long expireAfterAccessInMillis = 0;
private Function<K, Object> keyConvertor;
private CacheLoader<K, V> loader;
private List<CacheMonitor> monitors = new ArrayList<>();
private boolean cacheNullValue = false;
private RefreshPolicy refreshPolicy;
private int tryLockUnlockCount = 2;
private int tryLockInquiryCount = 1;
private int tryLockLockCount = 2;
private boolean cachePenetrationProtect = false;
private Duration penetrationProtectTimeout = null;
// 省略
} |
其中比较明显的含义就先不提了, 我们先来看几个明显的 参数,比如这个CacheLoader 是进行数据加载的,接下来我们主要看其是怎么进行代码组织的。
从这个Config 文件中,我们可以看到其具有 loader , refresh Cache 功能, 增删Cache 通知回调给 Monitors 的功能, 我们先看看 JetCache 里面的 类的继承结构,这对于我们比较宏观理解JetCache 是如何运转的有一定的帮助

Loader 和 RefreshCache
我们从 主 要起作用 的Cache 接口看起, 其中 Loader 和 refresh Cache 功能是由 ProxyCache 接口组织起来的, 从名字上也好理解,ProxyCache 在 Cache 原本功能接口上加上 附件的功能, 像能够自主加载数据 的LoadingCache 和定时更新 缓存的RefreshCache,用这样的接口组织起来都是比较合理的方式。而Monitor 的实现操作监听的方式,我们放在下个章节来展开。
这节我们看看 jetCache 里面的两个特色功能,如loader 和 refresh Cache 是如何实现的。
其中我们看看 SimpleProxyCache 接口
public interface ProxyCache<K, V> extends Cache<K, V> {
Cache<K, V> getTargetCache();
@Override
default <T> T unwrap(Class<T> clazz) {
return getTargetCache().unwrap(clazz);
}
}
|
其中 getTagetCache 就是为了组合目前已有的Cache 接口,来完成ProxyCache 作为Cache的功能。
我们在这里就先不提 基本Cache 的功能了, 我们直接看 特色功能的实现,先看LoadingCache , 它的功能 无非就是 在Cache 中对应 的key 没有对应的value的时候,我们需要从loader 中把值取出来,然后塞进 loader 里面
LoadingCache
我们查看 LoadingCache 里面的 get 方法
@Override
public V get(K key) throws CacheInvokeException {
CacheLoader<K, V> loader = config.getLoader();
if (loader != null) {
return AbstractCache.computeIfAbsentImpl(key, loader,
config.isCacheNullValue() ,0, null, this);
} else {
return cache.get(key);
}
} |
其中这里面的,AbstractCache ,computeIfAbsentImpl 查看是否 cache 里面有这个 key 的值,没有的话会从loader 里面取
static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
CacheGetResult<V> r;
if (cache instanceof RefreshCache) {
RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
r = refreshCache.GET(key);
refreshCache.addOrUpdateRefreshTask(key, newLoader);
} else {
r = cache.GET(key);
}
if (r.isSuccess()) {
return r.getValue();
} else {
Consumer<V> cacheUpdater = (loadedValue) -> {
if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
if (timeUnit != null) {
cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
} else {
cache.PUT(key, loadedValue).waitForResult();
}
}
};
V loadedValue;
if (cache.config().isCachePenetrationProtect()) {
loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
} else {
loadedValue = newLoader.apply(key);
cacheUpdater.accept(loadedValue);
}
return loadedValue;
}
}
|
先判断是否为 RefreshCache,如果是,需要把其加入 定时更新的逻辑里面取。 从 cache 里面取值如果成功 直接返回,如果不成功,需要走loader 逻辑,并且把 cache里面的值更新。
可以看到,config 里面有个开关,isCachePenetrationProtect ,这个意思是 是否进行 缓存穿透保护,如果开启了这个,那么对同一个key的 loader 取值, 同一时刻只会有一个 线程去取,不会给数据库造成太大压力。
我们可以看看其 对 key 区分线程数去取的逻辑
static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
Object lockKey = buildLoaderLockKey(abstractCache, key);
while (true) {
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
V loadedValue = newLoader.apply(key);
ll.success = true;
ll.value = loadedValue;
cacheUpdater.accept(loadedValue);
return loadedValue;
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
try {
Duration timeout = config.getPenetrationProtectTimeout();
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
if(!ok) {
logger.info("loader wait timeout:" + timeout);
return newLoader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return newLoader.apply(key);
}
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
} |
这里通过比较巧妙的方式,把不同key 做了单独的线程同步控制,并没有采用 queue空间存储的方式存了很多线程。 本质上是以栈的方式来存 等待的线程信息的,过程可以细品一下。
RefreshCache
到这里,我们可以转而再看看 RefreshCache里面做了什么, RefreshCache 需要用loader 进行周期性的Cache更新。RefreshCache 继承自LoadingCache
RefreshCache里面的一个关键函数是,把 key 和 loader 存进一个任务列表 Map里面,然后进行利用 SchedledExecutorPool 进行周期性调度
看其中一个关键函数
protected void addOrUpdateRefreshTask(K key, CacheLoader<K,V> loader) {
RefreshPolicy refreshPolicy = config.getRefreshPolicy();
if (refreshPolicy == null) {
return;
}
long refreshMillis = refreshPolicy.getRefreshMillis();
if (refreshMillis > 0) {
Object taskId = getTaskId(key);
RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> {
logger.debug("add refresh task. interval={}, key={}", refreshMillis , key);
RefreshTask task = new RefreshTask(taskId, key, loader);
task.lastAccessTime = System.currentTimeMillis();
ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(
task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS);
task.future = future;
return task;
});
refreshTask.lastAccessTime = System.currentTimeMillis();
}
} |
如果有 refreshPolicy ,则计算 taskId, 然后放进 任务Map 里面
RefreshTask 里面会去计算上次loader时间,然后判断是否需要重新loader,从而实现了 Refresh 的功能
class RefreshTask implements Runnable {
@Override
public void run() {
try {
if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) {
cancel();
return;
}
long now = System.currentTimeMillis();
long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
if (stopRefreshAfterLastAccessMillis > 0) {
if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) {
logger.debug("cancel refresh: {}", key);
cancel();
return;
}
}
logger.debug("refresh key: {}", key);
Cache concreteCache = concreteCache();
if (concreteCache instanceof AbstractExternalCache) {
externalLoad(concreteCache, now);
} else {
load();
}
} catch (Throwable e) {
logger.error("refresh error: key=" + key, e);
}
}
}
// 省略其他
} |
结尾
本文简单讲述了 LoadingCache 和 RefreshCache 的相关具体功能点和 源码实现,下节我们再看看 Moniter 的相关逻辑实现