JetCache– 从 Config查看其衍生出的特殊功能

序言

 从jetCache 中,查看几个特色功能,

例如 loader ,当cache 没有数据时候,怎么进行loader 取出数据

refreshCache ,怎么定时刷新cache,当然是从loader cache 中取

怎么合理利用proxy 模式组合

怎么进行moniter 事件通知

这几个专题可能需要多篇来解释

 

 

正文

我们还是从Cache 接口中看起

 

    /**
     * Get the config of this cache.
     * @return the cache config
     */
    CacheConfig<K, V> config();

其中看到一个CacheConfig 我们进去查看其内容

 

public class CacheConfig<K, V> implements Cloneable {
    private long expireAfterWriteInMillis = CacheConsts.DEFAULT_EXPIRE * 1000L;
    private long expireAfterAccessInMillis = 0;
    private Function<K, Object> keyConvertor;

    private CacheLoader<K, V> loader;

    private List<CacheMonitor> monitors = new ArrayList<>();

    private boolean cacheNullValue = false;

    private RefreshPolicy refreshPolicy;

    private int tryLockUnlockCount = 2;

    private int tryLockInquiryCount = 1;

    private int tryLockLockCount = 2;

    private boolean cachePenetrationProtect = false;
    private Duration penetrationProtectTimeout = null;

 // 省略

}

其中比较明显的含义就先不提了, 我们先来看几个明显的 参数,比如这个CacheLoader  是进行数据加载的,接下来我们主要看其是怎么进行代码组织的。

从这个Config 文件中,我们可以看到其具有 loader , refresh Cache 功能, 增删Cache 通知回调给 Monitors 的功能, 我们先看看 JetCache 里面的 类的继承结构,这对于我们比较宏观理解JetCache 是如何运转的有一定的帮助

 

 

 

Loader 和 RefreshCache 

我们从 主 要起作用 的Cache 接口看起, 其中 Loader 和 refresh Cache 功能是由 ProxyCache 接口组织起来的, 从名字上也好理解,ProxyCache 在 Cache 原本功能接口上加上 附件的功能, 像能够自主加载数据 的LoadingCache 和定时更新 缓存的RefreshCache,用这样的接口组织起来都是比较合理的方式。而Monitor 的实现操作监听的方式,我们放在下个章节来展开。

这节我们看看 jetCache 里面的两个特色功能,如loader 和 refresh Cache 是如何实现的。

 

其中我们看看 SimpleProxyCache 接口

 

public interface ProxyCache<K, V> extends Cache<K, V> {
    Cache<K, V> getTargetCache();

    @Override
    default <T> T unwrap(Class<T> clazz) {
        return getTargetCache().unwrap(clazz);
    }

}

其中 getTagetCache 就是为了组合目前已有的Cache 接口,来完成ProxyCache 作为Cache的功能。

我们在这里就先不提 基本Cache 的功能了, 我们直接看 特色功能的实现,先看LoadingCache , 它的功能 无非就是 在Cache 中对应 的key 没有对应的value的时候,我们需要从loader 中把值取出来,然后塞进 loader 里面

 

 

LoadingCache

 

我们查看 LoadingCache 里面的 get 方法

  @Override
    public V get(K key) throws CacheInvokeException {
        CacheLoader<K, V> loader = config.getLoader();
        if (loader != null) {
            return AbstractCache.computeIfAbsentImpl(key, loader,
                    config.isCacheNullValue() ,0, null, this);
        } else {
            return cache.get(key);
        }
    }

其中这里面的,AbstractCache ,computeIfAbsentImpl  查看是否 cache 里面有这个 key 的值,没有的话会从loader 里面取

 

 static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
                                               long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
        AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
        CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
        CacheGetResult<V> r;
        if (cache instanceof RefreshCache) {
            RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
            r = refreshCache.GET(key);
            refreshCache.addOrUpdateRefreshTask(key, newLoader);
        } else {
            r = cache.GET(key);
        }
        if (r.isSuccess()) {
            return r.getValue();
        } else {
            Consumer<V> cacheUpdater = (loadedValue) -> {
                if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
                    if (timeUnit != null) {
                        cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
                    } else {
                        cache.PUT(key, loadedValue).waitForResult();
                    }
                }
            };

            V loadedValue;
            if (cache.config().isCachePenetrationProtect()) {
                loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
            } else {
                loadedValue = newLoader.apply(key);
                cacheUpdater.accept(loadedValue);
            }

            return loadedValue;
        }
    }

 先判断是否为 RefreshCache,如果是,需要把其加入 定时更新的逻辑里面取。 从 cache 里面取值如果成功 直接返回,如果不成功,需要走loader 逻辑,并且把 cache里面的值更新。

可以看到,config 里面有个开关,isCachePenetrationProtect ,这个意思是 是否进行 缓存穿透保护,如果开启了这个,那么对同一个key的 loader 取值, 同一时刻只会有一个 线程去取,不会给数据库造成太大压力。

我们可以看看其 对 key 区分线程数去取的逻辑

 

 

static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
                                     K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
        ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
        Object lockKey = buildLoaderLockKey(abstractCache, key);
        while (true) {
            boolean create[] = new boolean[1];
            LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
                create[0] = true;
                LoaderLock loaderLock = new LoaderLock();
                loaderLock.signal = new CountDownLatch(1);
                loaderLock.loaderThread = Thread.currentThread();
                return loaderLock;
            });
            if (create[0] || ll.loaderThread == Thread.currentThread()) {
                try {
                    V loadedValue = newLoader.apply(key);
                    ll.success = true;
                    ll.value = loadedValue;
                    cacheUpdater.accept(loadedValue);
                    return loadedValue;
                } finally {
                    if (create[0]) {
                        ll.signal.countDown();
                        loaderMap.remove(lockKey);
                    }
                }
            } else {
                try {
                    Duration timeout = config.getPenetrationProtectTimeout();
                    if (timeout == null) {
                        ll.signal.await();
                    } else {
                        boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
                        if(!ok) {
                            logger.info("loader wait timeout:" + timeout);
                            return newLoader.apply(key);
                        }
                    }
                } catch (InterruptedException e) {
                    logger.warn("loader wait interrupted");
                    return newLoader.apply(key);
                }
                if (ll.success) {
                    return (V) ll.value;
                } else {
                    continue;
                }

            }
        }
    }

 

这里通过比较巧妙的方式,把不同key 做了单独的线程同步控制,并没有采用 queue空间存储的方式存了很多线程。  本质上是以栈的方式来存 等待的线程信息的,过程可以细品一下。

 

 

RefreshCache

 

到这里,我们可以转而再看看 RefreshCache里面做了什么, RefreshCache 需要用loader 进行周期性的Cache更新。RefreshCache 继承自LoadingCache

RefreshCache里面的一个关键函数是,把 key 和 loader 存进一个任务列表 Map里面,然后进行利用 SchedledExecutorPool 进行周期性调度

看其中一个关键函数

 

    protected void addOrUpdateRefreshTask(K key, CacheLoader<K,V> loader) {
        RefreshPolicy refreshPolicy = config.getRefreshPolicy();
        if (refreshPolicy == null) {
            return;
        }
        long refreshMillis = refreshPolicy.getRefreshMillis();
        if (refreshMillis > 0) {
            Object taskId = getTaskId(key);
            RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> {
                logger.debug("add refresh task. interval={},  key={}", refreshMillis , key);
                RefreshTask task = new RefreshTask(taskId, key, loader);
                task.lastAccessTime = System.currentTimeMillis();
                ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(
                        task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS);
                task.future = future;
                return task;
            });
            refreshTask.lastAccessTime = System.currentTimeMillis();
        }
    }

 

如果有 refreshPolicy ,则计算 taskId, 然后放进 任务Map 里面

RefreshTask 里面会去计算上次loader时间,然后判断是否需要重新loader,从而实现了 Refresh 的功能

 

    class RefreshTask implements Runnable {


  @Override
        public void run() {
            try {
                if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) {
                    cancel();
                    return;
                }
                long now = System.currentTimeMillis();
                long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
                if (stopRefreshAfterLastAccessMillis > 0) {
                    if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) {
                        logger.debug("cancel refresh: {}", key);
                        cancel();
                        return;
                    }
                }
                logger.debug("refresh key: {}", key);
                Cache concreteCache = concreteCache();
                if (concreteCache instanceof AbstractExternalCache) {
                    externalLoad(concreteCache, now);
                } else {
                    load();
                }
            } catch (Throwable e) {
                logger.error("refresh error: key=" + key, e);
            }
        }

}

 // 省略其他
}

 

 

结尾

 

本文简单讲述了 LoadingCache 和 RefreshCache 的相关具体功能点和 源码实现,下节我们再看看 Moniter 的相关逻辑实现

 


版权声明:本文为dongjijiaoxiangqu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。