序言
从jetCache 中,查看几个特色功能,
例如 loader ,当cache 没有数据时候,怎么进行loader 取出数据
refreshCache ,怎么定时刷新cache,当然是从loader cache 中取
怎么合理利用proxy 模式组合
怎么进行moniter 事件通知
这几个专题可能需要多篇来解释
正文
我们还是从Cache 接口中看起
/** * Get the config of this cache. * @return the cache config */ CacheConfig<K, V> config(); |
其中看到一个CacheConfig 我们进去查看其内容
public class CacheConfig<K, V> implements Cloneable { private long expireAfterWriteInMillis = CacheConsts.DEFAULT_EXPIRE * 1000L; private long expireAfterAccessInMillis = 0; private Function<K, Object> keyConvertor; private CacheLoader<K, V> loader; private List<CacheMonitor> monitors = new ArrayList<>(); private boolean cacheNullValue = false; private RefreshPolicy refreshPolicy; private int tryLockUnlockCount = 2; private int tryLockInquiryCount = 1; private int tryLockLockCount = 2; private boolean cachePenetrationProtect = false; private Duration penetrationProtectTimeout = null; // 省略 } |
其中比较明显的含义就先不提了, 我们先来看几个明显的 参数,比如这个CacheLoader 是进行数据加载的,接下来我们主要看其是怎么进行代码组织的。
从这个Config 文件中,我们可以看到其具有 loader , refresh Cache 功能, 增删Cache 通知回调给 Monitors 的功能, 我们先看看 JetCache 里面的 类的继承结构,这对于我们比较宏观理解JetCache 是如何运转的有一定的帮助
Loader 和 RefreshCache
我们从 主 要起作用 的Cache 接口看起, 其中 Loader 和 refresh Cache 功能是由 ProxyCache 接口组织起来的, 从名字上也好理解,ProxyCache 在 Cache 原本功能接口上加上 附件的功能, 像能够自主加载数据 的LoadingCache 和定时更新 缓存的RefreshCache,用这样的接口组织起来都是比较合理的方式。而Monitor 的实现操作监听的方式,我们放在下个章节来展开。
这节我们看看 jetCache 里面的两个特色功能,如loader 和 refresh Cache 是如何实现的。
其中我们看看 SimpleProxyCache 接口
public interface ProxyCache<K, V> extends Cache<K, V> { Cache<K, V> getTargetCache(); @Override default <T> T unwrap(Class<T> clazz) { return getTargetCache().unwrap(clazz); } } |
其中 getTagetCache 就是为了组合目前已有的Cache 接口,来完成ProxyCache 作为Cache的功能。
我们在这里就先不提 基本Cache 的功能了, 我们直接看 特色功能的实现,先看LoadingCache , 它的功能 无非就是 在Cache 中对应 的key 没有对应的value的时候,我们需要从loader 中把值取出来,然后塞进 loader 里面
LoadingCache
我们查看 LoadingCache 里面的 get 方法
@Override public V get(K key) throws CacheInvokeException { CacheLoader<K, V> loader = config.getLoader(); if (loader != null) { return AbstractCache.computeIfAbsentImpl(key, loader, config.isCacheNullValue() ,0, null, this); } else { return cache.get(key); } } |
其中这里面的,AbstractCache ,computeIfAbsentImpl 查看是否 cache 里面有这个 key 的值,没有的话会从loader 里面取
static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) { AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache); CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify); CacheGetResult<V> r; if (cache instanceof RefreshCache) { RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache); r = refreshCache.GET(key); refreshCache.addOrUpdateRefreshTask(key, newLoader); } else { r = cache.GET(key); } if (r.isSuccess()) { return r.getValue(); } else { Consumer<V> cacheUpdater = (loadedValue) -> { if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) { if (timeUnit != null) { cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult(); } else { cache.PUT(key, loadedValue).waitForResult(); } } }; V loadedValue; if (cache.config().isCachePenetrationProtect()) { loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater); } else { loadedValue = newLoader.apply(key); cacheUpdater.accept(loadedValue); } return loadedValue; } } |
先判断是否为 RefreshCache,如果是,需要把其加入 定时更新的逻辑里面取。 从 cache 里面取值如果成功 直接返回,如果不成功,需要走loader 逻辑,并且把 cache里面的值更新。
可以看到,config 里面有个开关,isCachePenetrationProtect ,这个意思是 是否进行 缓存穿透保护,如果开启了这个,那么对同一个key的 loader 取值, 同一时刻只会有一个 线程去取,不会给数据库造成太大压力。
我们可以看看其 对 key 区分线程数去取的逻辑
static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache, K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) { ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap(); Object lockKey = buildLoaderLockKey(abstractCache, key); while (true) { boolean create[] = new boolean[1]; LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> { create[0] = true; LoaderLock loaderLock = new LoaderLock(); loaderLock.signal = new CountDownLatch(1); loaderLock.loaderThread = Thread.currentThread(); return loaderLock; }); if (create[0] || ll.loaderThread == Thread.currentThread()) { try { V loadedValue = newLoader.apply(key); ll.success = true; ll.value = loadedValue; cacheUpdater.accept(loadedValue); return loadedValue; } finally { if (create[0]) { ll.signal.countDown(); loaderMap.remove(lockKey); } } } else { try { Duration timeout = config.getPenetrationProtectTimeout(); if (timeout == null) { ll.signal.await(); } else { boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS); if(!ok) { logger.info("loader wait timeout:" + timeout); return newLoader.apply(key); } } } catch (InterruptedException e) { logger.warn("loader wait interrupted"); return newLoader.apply(key); } if (ll.success) { return (V) ll.value; } else { continue; } } } } |
这里通过比较巧妙的方式,把不同key 做了单独的线程同步控制,并没有采用 queue空间存储的方式存了很多线程。 本质上是以栈的方式来存 等待的线程信息的,过程可以细品一下。
RefreshCache
到这里,我们可以转而再看看 RefreshCache里面做了什么, RefreshCache 需要用loader 进行周期性的Cache更新。RefreshCache 继承自LoadingCache
RefreshCache里面的一个关键函数是,把 key 和 loader 存进一个任务列表 Map里面,然后进行利用 SchedledExecutorPool 进行周期性调度
看其中一个关键函数
protected void addOrUpdateRefreshTask(K key, CacheLoader<K,V> loader) { RefreshPolicy refreshPolicy = config.getRefreshPolicy(); if (refreshPolicy == null) { return; } long refreshMillis = refreshPolicy.getRefreshMillis(); if (refreshMillis > 0) { Object taskId = getTaskId(key); RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> { logger.debug("add refresh task. interval={}, key={}", refreshMillis , key); RefreshTask task = new RefreshTask(taskId, key, loader); task.lastAccessTime = System.currentTimeMillis(); ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay( task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS); task.future = future; return task; }); refreshTask.lastAccessTime = System.currentTimeMillis(); } } |
如果有 refreshPolicy ,则计算 taskId, 然后放进 任务Map 里面
RefreshTask 里面会去计算上次loader时间,然后判断是否需要重新loader,从而实现了 Refresh 的功能
class RefreshTask implements Runnable { @Override public void run() { try { if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) { cancel(); return; } long now = System.currentTimeMillis(); long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis(); if (stopRefreshAfterLastAccessMillis > 0) { if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) { logger.debug("cancel refresh: {}", key); cancel(); return; } } logger.debug("refresh key: {}", key); Cache concreteCache = concreteCache(); if (concreteCache instanceof AbstractExternalCache) { externalLoad(concreteCache, now); } else { load(); } } catch (Throwable e) { logger.error("refresh error: key=" + key, e); } } } // 省略其他 } |
结尾
本文简单讲述了 LoadingCache 和 RefreshCache 的相关具体功能点和 源码实现,下节我们再看看 Moniter 的相关逻辑实现