序言
程序的灵魂是数据结构和算法
我们从数据结构出发了解,jetCache 的整体设计, 我们知道 Cache 简单来说就是 一个 Map<K,V> 承载的 一个 hash结构, 但是这样的话,我们通过 key 来拿到 value ,就不能拿到其他很多的信息。而且往往通过 map.get(key) 拿到的value 往往隐含的一个事实,就是这个是一个同步操作,
不能利用到异步的优点,从get 方式我们看不出异步的好处,但是从put 的方式,我们很容易看到异步的好处,要是 有一个cache ,它能用 cache.putAsync(key,value)方法,那么这样就能利用到异步的优点,使得程序不阻塞主线程的时候,放置一些非必要的缓存放置阻塞操作,(有人可能会说我用个线程不一样嘛,我干嘛需要异步接口),但是你平时写程序时候,总是没有有意识地用到异步放置缓存,有个异步接口能方便使用,总之利大于弊。
回到主题,我们知道 redis 的letture 最为人称道的就是它的异步特性,底层是利用了nio的方式来实现的, 而jedis 是用了bio方式实现的,nio方式的性能一般来说要优于bio实现的程序。所以为了利用到异步的特性,我们这个cache 返回的value 结果存储,应该也要能是利用到异步特性的,
正文
CacheGetResult 数据结构
我们简单来看下,cache中这个value的数据结构。
从 Cache 接口中,我们看到取的接口为
default V get(K key) throws CacheInvokeException { CacheGetResult<V> result = GET(key); if (result.isSuccess()) { return result.getValue(); } else { return null; } } |
我们看到取的value的数据结构是 CacheGetResult ,我们来研究下,它的设计,见微知著,一叶知秋(这也是希望能做到的状态,菩提曾说过,一花一世界,一叶一春秋)
public class CacheGetResult<V> extends CacheResult { private V value; private CacheValueHolder<V> holder; public static final CacheGetResult NOT_EXISTS_WITHOUT_MSG = new CacheGetResult(CacheResultCode.NOT_EXISTS, null, null); public static final CacheGetResult EXPIRED_WITHOUT_MSG = new CacheGetResult(CacheResultCode.EXPIRED, null ,null); // 省略其他不重要部分 } |
篇幅原因,我们会省略其他不重要部分代码,好,我们接下来看 CacheResult
package com.alicp.jetcache; import com.alicp.jetcache.anno.CacheConsts; import java.time.Duration; import java.util.concurrent.*; /** * Created on 2016/9/28. * * @author <a href="mailto:areyouok@gmail.com">huangli</a> */ public class CacheResult { public static final String MSG_ILLEGAL_ARGUMENT = "illegal argument"; private static Duration DEFAULT_TIMEOUT = CacheConsts.ASYNC_RESULT_TIMEOUT; public static final CacheResult SUCCESS_WITHOUT_MSG = new CacheResult(CacheResultCode.SUCCESS, null); public static final CacheResult PART_SUCCESS_WITHOUT_MSG = new CacheResult(CacheResultCode.PART_SUCCESS, null); public static final CacheResult FAIL_WITHOUT_MSG = new CacheResult(CacheResultCode.FAIL, null); public static final CacheResult FAIL_ILLEGAL_ARGUMENT = new CacheResult(CacheResultCode.FAIL, MSG_ILLEGAL_ARGUMENT); public static final CacheResult EXISTS_WITHOUT_MSG = new CacheResult(CacheResultCode.EXISTS, null); private CacheResultCode resultCode; private String message; private CompletionStage<ResultData> future; private Duration timeout = DEFAULT_TIMEOUT; public CacheResult(CompletionStage<ResultData> future) { this.future = future; } public CacheResult(CacheResultCode resultCode, String message) { this(CompletableFuture.completedFuture(new ResultData(resultCode, message, null))); } public CacheResult(Throwable ex) { future = CompletableFuture.completedFuture(new ResultData(ex)); } public boolean isSuccess() { return getResultCode() == CacheResultCode.SUCCESS; } protected void waitForResult() { waitForResult(timeout); } public void waitForResult(Duration timeout) { if (resultCode != null) { return; } try { ResultData resultData = future.toCompletableFuture().get( timeout.toMillis(), TimeUnit.MILLISECONDS); fetchResultSuccess(resultData); } catch (TimeoutException | InterruptedException | ExecutionException e) { fetchResultFail(e); } } protected void fetchResultSuccess(ResultData resultData) { resultCode = resultData.getResultCode(); message = resultData.getMessage(); } protected void fetchResultFail(Throwable e) { resultCode = CacheResultCode.FAIL; message = e.getClass() + ":" + e.getMessage(); } public CacheResultCode getResultCode() { waitForResult(); return resultCode; } public String getMessage() { waitForResult(); return message; } public CompletionStage<ResultData> future() { return future; } public static void setDefaultTimeout(Duration defaultTimeout) { DEFAULT_TIMEOUT = defaultTimeout; } public void setTimeout(Duration timeout) { this.timeout = timeout; } } |
在说这个具体CacheResult 之前,我们看看其继承结构,
CacheGetResult<V> 也就是前面提到的数据结构
还有一个是MultiGetResult<K,V> ,看看其基础结构为
public class MultiGetResult<K, V> extends CacheResult { private Map<K, CacheGetResult<V>> values; // 省略其他 } |
很容易看到 它是前面CacheGetResult 的一个封装
回到 CacheResult 这个基类,我们能从中看到怎么样的设计思路。
其中
private CacheResultCode resultCode; private String message; public enum CacheResultCode { SUCCESS, PART_SUCCESS, FAIL, NOT_EXISTS, EXISTS, EXPIRED } |
一个是CacheResultCode, 一个是是否出错,以及其出错信息,这两部分都还算是常规设计。
下面有意思的就来了,真正的value值,放在这么一个结构里面
private CompletionStage<ResultData> future; |
我们先看看ResultData里面承载了什么样的内容
public class ResultData { private CacheResultCode resultCode; private String message; private Object data; // 省略 } |
CompletionStage
比较常规,我们再来了解这个CompletionStage 是个怎样的接口呢,这个接口是juc包下的一个表示 异步状态的一个接口,从名字我们也可以发现,是否是完全状态的接口,应该是表示一个异步调用,结果是否已经完善。跟future 应该有一定关系,从接口的继承关系,看看java这个接口的设计
可以看到java中异步接口如果需要拿到结果是否已经准备好了,基本会用这个接口来拓展,特别是一些completableFutrue ,RedisFutrue 等都已经继承这个接口了,意味着我们可以通过这个接口拿到异步的结果,也是为redis letture包做准备。
我们简单看下这个接口里面的方法
接口比较众多一点,不一一解释了,基本就是利用异步的特性,方便我们使用的一些异步接口。
回到CacheResult
看到CacheResult 的构造函数
public CacheResult(CompletionStage<ResultData> future) { this.future = future; } public CacheResult(CacheResultCode resultCode, String message) { this(CompletableFuture.completedFuture(new ResultData(resultCode, message, null))); } public CacheResult(Throwable ex) { future = CompletableFuture.completedFuture(new ResultData(ex)); } |
可以看到其中一个就是要传一个future进来,为了获取其异步结果,另一个就是同步的value传进来构造CacheResult,我们猜测这个异步接口,跟redis letture 异步接口是大概率有关系的,我们查看其调用处
这个调用处位于
RedisLettuceCache
@Override protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) { try { CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite)); byte[] newKey = buildKey(key); RedisFuture<String> future = stringAsyncCommands.psetex(newKey, timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder)); CacheResult result = new CacheResult(future.handle((rt, ex) -> { if (ex != null) { JetCacheExecutor.defaultExecutor().execute(() -> logError("PUT", key, ex)); return new ResultData(ex); } else { if ("OK".equals(rt)) { return new ResultData(CacheResultCode.SUCCESS, null, null); } else { return new ResultData(CacheResultCode.FAIL, rt, null); } } })); setTimeout(result); return result; } catch (Exception ex) { logError("PUT", key, ex); return new CacheResult(ex); } } |
从而证明我们的猜测没有错, 通过 letture 传过来的异步结果,然后传进这个 future 处理的 CompleteStage 这个接口传进 CacheValueResult里面方便其调用,
而CacheResult 里面的关键利用到这个CompleteStage 的地方也就是等待 处理结果的地方,例如
public void waitForResult(Duration timeout) { if (resultCode != null) { return; } try { ResultData resultData = future.toCompletableFuture().get( timeout.toMillis(), TimeUnit.MILLISECONDS); fetchResultSuccess(resultData); } catch (TimeoutException | InterruptedException | ExecutionException e) { fetchResultFail(e); } } |
这里还有一点没有提到的是DataResult 里面的 Object value ,它的实际承载数据结构其实是
public final class CacheValueHolder<V> implements Serializable { private static final long serialVersionUID = -7973743507831565203L; private V value; private long expireTime; private long accessTime; // 省略其他 } |
除了必要的 value 信息,还附带了 一些 accessTime,expireTime等附带信息
总结
本文比较简单,简单介绍了下,jetcache 里面如何把异步和同步统一起来,利用的是CompleteStage 这个接口,把异步的 redis letture 客户端 返回的结果,结合到CacheResult 里面。这种异步结果获取,糅合进我们自己的数据结构的方式值得学习和思考