序言
程序的灵魂是数据结构和算法
我们从数据结构出发了解,jetCache 的整体设计, 我们知道 Cache 简单来说就是 一个 Map<K,V> 承载的 一个 hash结构, 但是这样的话,我们通过 key 来拿到 value ,就不能拿到其他很多的信息。而且往往通过 map.get(key) 拿到的value 往往隐含的一个事实,就是这个是一个同步操作,
不能利用到异步的优点,从get 方式我们看不出异步的好处,但是从put 的方式,我们很容易看到异步的好处,要是 有一个cache ,它能用 cache.putAsync(key,value)方法,那么这样就能利用到异步的优点,使得程序不阻塞主线程的时候,放置一些非必要的缓存放置阻塞操作,(有人可能会说我用个线程不一样嘛,我干嘛需要异步接口),但是你平时写程序时候,总是没有有意识地用到异步放置缓存,有个异步接口能方便使用,总之利大于弊。
回到主题,我们知道 redis 的letture 最为人称道的就是它的异步特性,底层是利用了nio的方式来实现的, 而jedis 是用了bio方式实现的,nio方式的性能一般来说要优于bio实现的程序。所以为了利用到异步的特性,我们这个cache 返回的value 结果存储,应该也要能是利用到异步特性的,
正文
CacheGetResult 数据结构
我们简单来看下,cache中这个value的数据结构。
从 Cache 接口中,我们看到取的接口为
default V get(K key) throws CacheInvokeException {
CacheGetResult<V> result = GET(key);
if (result.isSuccess()) {
return result.getValue();
} else {
return null;
}
}
|
我们看到取的value的数据结构是 CacheGetResult ,我们来研究下,它的设计,见微知著,一叶知秋(这也是希望能做到的状态,菩提曾说过,一花一世界,一叶一春秋)
public class CacheGetResult<V> extends CacheResult {
private V value;
private CacheValueHolder<V> holder;
public static final CacheGetResult NOT_EXISTS_WITHOUT_MSG = new CacheGetResult(CacheResultCode.NOT_EXISTS, null, null);
public static final CacheGetResult EXPIRED_WITHOUT_MSG = new CacheGetResult(CacheResultCode.EXPIRED, null ,null);
// 省略其他不重要部分
} |
篇幅原因,我们会省略其他不重要部分代码,好,我们接下来看 CacheResult
package com.alicp.jetcache;
import com.alicp.jetcache.anno.CacheConsts;
import java.time.Duration;
import java.util.concurrent.*;
/**
* Created on 2016/9/28.
*
* @author <a href="mailto:areyouok@gmail.com">huangli</a>
*/
public class CacheResult {
public static final String MSG_ILLEGAL_ARGUMENT = "illegal argument";
private static Duration DEFAULT_TIMEOUT = CacheConsts.ASYNC_RESULT_TIMEOUT;
public static final CacheResult SUCCESS_WITHOUT_MSG = new CacheResult(CacheResultCode.SUCCESS, null);
public static final CacheResult PART_SUCCESS_WITHOUT_MSG = new CacheResult(CacheResultCode.PART_SUCCESS, null);
public static final CacheResult FAIL_WITHOUT_MSG = new CacheResult(CacheResultCode.FAIL, null);
public static final CacheResult FAIL_ILLEGAL_ARGUMENT = new CacheResult(CacheResultCode.FAIL, MSG_ILLEGAL_ARGUMENT);
public static final CacheResult EXISTS_WITHOUT_MSG = new CacheResult(CacheResultCode.EXISTS, null);
private CacheResultCode resultCode;
private String message;
private CompletionStage<ResultData> future;
private Duration timeout = DEFAULT_TIMEOUT;
public CacheResult(CompletionStage<ResultData> future) {
this.future = future;
}
public CacheResult(CacheResultCode resultCode, String message) {
this(CompletableFuture.completedFuture(new ResultData(resultCode, message, null)));
}
public CacheResult(Throwable ex) {
future = CompletableFuture.completedFuture(new ResultData(ex));
}
public boolean isSuccess() {
return getResultCode() == CacheResultCode.SUCCESS;
}
protected void waitForResult() {
waitForResult(timeout);
}
public void waitForResult(Duration timeout) {
if (resultCode != null) {
return;
}
try {
ResultData resultData = future.toCompletableFuture().get(
timeout.toMillis(), TimeUnit.MILLISECONDS);
fetchResultSuccess(resultData);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
fetchResultFail(e);
}
}
protected void fetchResultSuccess(ResultData resultData) {
resultCode = resultData.getResultCode();
message = resultData.getMessage();
}
protected void fetchResultFail(Throwable e) {
resultCode = CacheResultCode.FAIL;
message = e.getClass() + ":" + e.getMessage();
}
public CacheResultCode getResultCode() {
waitForResult();
return resultCode;
}
public String getMessage() {
waitForResult();
return message;
}
public CompletionStage<ResultData> future() {
return future;
}
public static void setDefaultTimeout(Duration defaultTimeout) {
DEFAULT_TIMEOUT = defaultTimeout;
}
public void setTimeout(Duration timeout) {
this.timeout = timeout;
}
}
|
在说这个具体CacheResult 之前,我们看看其继承结构,
CacheGetResult<V> 也就是前面提到的数据结构
还有一个是MultiGetResult<K,V> ,看看其基础结构为
public class MultiGetResult<K, V> extends CacheResult {
private Map<K, CacheGetResult<V>> values;
// 省略其他
} |
很容易看到 它是前面CacheGetResult 的一个封装
回到 CacheResult 这个基类,我们能从中看到怎么样的设计思路。
其中
private CacheResultCode resultCode;
private String message;
public enum CacheResultCode {
SUCCESS, PART_SUCCESS, FAIL, NOT_EXISTS, EXISTS, EXPIRED
}
|
一个是CacheResultCode, 一个是是否出错,以及其出错信息,这两部分都还算是常规设计。
下面有意思的就来了,真正的value值,放在这么一个结构里面
private CompletionStage<ResultData> future; |
我们先看看ResultData里面承载了什么样的内容
public class ResultData {
private CacheResultCode resultCode;
private String message;
private Object data;
// 省略
} |
CompletionStage
比较常规,我们再来了解这个CompletionStage 是个怎样的接口呢,这个接口是juc包下的一个表示 异步状态的一个接口,从名字我们也可以发现,是否是完全状态的接口,应该是表示一个异步调用,结果是否已经完善。跟future 应该有一定关系,从接口的继承关系,看看java这个接口的设计

可以看到java中异步接口如果需要拿到结果是否已经准备好了,基本会用这个接口来拓展,特别是一些completableFutrue ,RedisFutrue 等都已经继承这个接口了,意味着我们可以通过这个接口拿到异步的结果,也是为redis letture包做准备。
我们简单看下这个接口里面的方法

接口比较众多一点,不一一解释了,基本就是利用异步的特性,方便我们使用的一些异步接口。
回到CacheResult
看到CacheResult 的构造函数
public CacheResult(CompletionStage<ResultData> future) {
this.future = future;
}
public CacheResult(CacheResultCode resultCode, String message) {
this(CompletableFuture.completedFuture(new ResultData(resultCode, message, null)));
}
public CacheResult(Throwable ex) {
future = CompletableFuture.completedFuture(new ResultData(ex));
}
|
可以看到其中一个就是要传一个future进来,为了获取其异步结果,另一个就是同步的value传进来构造CacheResult,我们猜测这个异步接口,跟redis letture 异步接口是大概率有关系的,我们查看其调用处
这个调用处位于
RedisLettuceCache
@Override
protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
try {
CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
byte[] newKey = buildKey(key);
RedisFuture<String> future = stringAsyncCommands.psetex(newKey, timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
CacheResult result = new CacheResult(future.handle((rt, ex) -> {
if (ex != null) {
JetCacheExecutor.defaultExecutor().execute(() -> logError("PUT", key, ex));
return new ResultData(ex);
} else {
if ("OK".equals(rt)) {
return new ResultData(CacheResultCode.SUCCESS, null, null);
} else {
return new ResultData(CacheResultCode.FAIL, rt, null);
}
}
}));
setTimeout(result);
return result;
} catch (Exception ex) {
logError("PUT", key, ex);
return new CacheResult(ex);
}
} |
从而证明我们的猜测没有错, 通过 letture 传过来的异步结果,然后传进这个 future 处理的 CompleteStage 这个接口传进 CacheValueResult里面方便其调用,
而CacheResult 里面的关键利用到这个CompleteStage 的地方也就是等待 处理结果的地方,例如
public void waitForResult(Duration timeout) {
if (resultCode != null) {
return;
}
try {
ResultData resultData = future.toCompletableFuture().get(
timeout.toMillis(), TimeUnit.MILLISECONDS);
fetchResultSuccess(resultData);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
fetchResultFail(e);
}
}
|
这里还有一点没有提到的是DataResult 里面的 Object value ,它的实际承载数据结构其实是
public final class CacheValueHolder<V> implements Serializable {
private static final long serialVersionUID = -7973743507831565203L;
private V value;
private long expireTime;
private long accessTime;
// 省略其他
} |
除了必要的 value 信息,还附带了 一些 accessTime,expireTime等附带信息
总结
本文比较简单,简单介绍了下,jetcache 里面如何把异步和同步统一起来,利用的是CompleteStage 这个接口,把异步的 redis letture 客户端 返回的结果,结合到CacheResult 里面。这种异步结果获取,糅合进我们自己的数据结构的方式值得学习和思考