JetCache -- CacheResult 介绍

 

 

序言

程序的灵魂是数据结构和算法 

我们从数据结构出发了解,jetCache 的整体设计, 我们知道 Cache 简单来说就是 一个 Map<K,V> 承载的 一个 hash结构, 但是这样的话,我们通过 key 来拿到 value ,就不能拿到其他很多的信息。而且往往通过    map.get(key) 拿到的value 往往隐含的一个事实,就是这个是一个同步操作,

不能利用到异步的优点,从get 方式我们看不出异步的好处,但是从put 的方式,我们很容易看到异步的好处,要是 有一个cache ,它能用 cache.putAsync(key,value)方法,那么这样就能利用到异步的优点,使得程序不阻塞主线程的时候,放置一些非必要的缓存放置阻塞操作,(有人可能会说我用个线程不一样嘛,我干嘛需要异步接口),但是你平时写程序时候,总是没有有意识地用到异步放置缓存,有个异步接口能方便使用,总之利大于弊。

回到主题,我们知道 redis 的letture 最为人称道的就是它的异步特性,底层是利用了nio的方式来实现的, 而jedis 是用了bio方式实现的,nio方式的性能一般来说要优于bio实现的程序。所以为了利用到异步的特性,我们这个cache 返回的value 结果存储,应该也要能是利用到异步特性的,

 

 

正文

CacheGetResult 数据结构

 

我们简单来看下,cache中这个value的数据结构。

从 Cache 接口中,我们看到取的接口为

 

  default V get(K key) throws CacheInvokeException {
        CacheGetResult<V> result = GET(key);
        if (result.isSuccess()) {
            return result.getValue();
        } else {
            return null;
        }
    }


我们看到取的value的数据结构是 CacheGetResult ,我们来研究下,它的设计,见微知著,一叶知秋(这也是希望能做到的状态,菩提曾说过,一花一世界,一叶一春秋)

 

 

public class CacheGetResult<V> extends CacheResult {
    private V value;
    private CacheValueHolder<V> holder;

    public static final CacheGetResult NOT_EXISTS_WITHOUT_MSG = new CacheGetResult(CacheResultCode.NOT_EXISTS, null, null);
    public static final CacheGetResult EXPIRED_WITHOUT_MSG = new CacheGetResult(CacheResultCode.EXPIRED, null ,null);

 // 省略其他不重要部分
}

 

篇幅原因,我们会省略其他不重要部分代码,好,我们接下来看 CacheResult

 

package com.alicp.jetcache;

import com.alicp.jetcache.anno.CacheConsts;

import java.time.Duration;
import java.util.concurrent.*;

/**
 * Created on 2016/9/28.
 *
 * @author <a href="mailto:areyouok@gmail.com">huangli</a>
 */
public class CacheResult {

    public static final String MSG_ILLEGAL_ARGUMENT = "illegal argument";

    private static Duration DEFAULT_TIMEOUT = CacheConsts.ASYNC_RESULT_TIMEOUT;

    public static final CacheResult SUCCESS_WITHOUT_MSG = new CacheResult(CacheResultCode.SUCCESS, null);
    public static final CacheResult PART_SUCCESS_WITHOUT_MSG = new CacheResult(CacheResultCode.PART_SUCCESS, null);
    public static final CacheResult FAIL_WITHOUT_MSG = new CacheResult(CacheResultCode.FAIL, null);
    public static final CacheResult FAIL_ILLEGAL_ARGUMENT = new CacheResult(CacheResultCode.FAIL, MSG_ILLEGAL_ARGUMENT);
    public static final CacheResult EXISTS_WITHOUT_MSG = new CacheResult(CacheResultCode.EXISTS, null);

    private CacheResultCode resultCode;
    private String message;
    private CompletionStage<ResultData> future;

    private Duration timeout = DEFAULT_TIMEOUT;

    public CacheResult(CompletionStage<ResultData> future) {
        this.future = future;
    }

    public CacheResult(CacheResultCode resultCode, String message) {
        this(CompletableFuture.completedFuture(new ResultData(resultCode, message, null)));
    }

    public CacheResult(Throwable ex) {
        future = CompletableFuture.completedFuture(new ResultData(ex));
    }

    public boolean isSuccess() {
        return getResultCode() == CacheResultCode.SUCCESS;
    }

    protected void waitForResult() {
        waitForResult(timeout);
    }

    public void waitForResult(Duration timeout) {
        if (resultCode != null) {
            return;
        }
        try {
            ResultData resultData = future.toCompletableFuture().get(
                    timeout.toMillis(), TimeUnit.MILLISECONDS);
            fetchResultSuccess(resultData);
        } catch (TimeoutException | InterruptedException | ExecutionException e) {
            fetchResultFail(e);
        }
    }

    protected void fetchResultSuccess(ResultData resultData) {
        resultCode = resultData.getResultCode();
        message = resultData.getMessage();
    }

    protected void fetchResultFail(Throwable e) {
        resultCode = CacheResultCode.FAIL;
        message = e.getClass() + ":" + e.getMessage();
    }

    public CacheResultCode getResultCode() {
        waitForResult();
        return resultCode;
    }

    public String getMessage() {
        waitForResult();
        return message;
    }

    public CompletionStage<ResultData> future() {
        return future;
    }

    public static void setDefaultTimeout(Duration defaultTimeout) {
        DEFAULT_TIMEOUT = defaultTimeout;
    }

    public void setTimeout(Duration timeout) {
        this.timeout = timeout;
    }
}

 

在说这个具体CacheResult 之前,我们看看其继承结构,

CacheGetResult<V>  也就是前面提到的数据结构

还有一个是MultiGetResult<K,V> ,看看其基础结构为

 

public class MultiGetResult<K, V> extends CacheResult {
    private Map<K, CacheGetResult<V>> values;
 // 省略其他
}

很容易看到 它是前面CacheGetResult 的一个封装

 

回到 CacheResult 这个基类,我们能从中看到怎么样的设计思路。

其中 

  private CacheResultCode resultCode;
    private String message;

public enum CacheResultCode {
    SUCCESS, PART_SUCCESS, FAIL, NOT_EXISTS, EXISTS, EXPIRED
}

一个是CacheResultCode, 一个是是否出错,以及其出错信息,这两部分都还算是常规设计。

下面有意思的就来了,真正的value值,放在这么一个结构里面

 

 private CompletionStage<ResultData> future;

我们先看看ResultData里面承载了什么样的内容

 

public class ResultData {
    private CacheResultCode resultCode;
    private String message;
    private Object data;

// 省略
}

 

CompletionStage

 

比较常规,我们再来了解这个CompletionStage 是个怎样的接口呢,这个接口是juc包下的一个表示 异步状态的一个接口,从名字我们也可以发现,是否是完全状态的接口,应该是表示一个异步调用,结果是否已经完善。跟future 应该有一定关系,从接口的继承关系,看看java这个接口的设计

 

可以看到java中异步接口如果需要拿到结果是否已经准备好了,基本会用这个接口来拓展,特别是一些completableFutrue ,RedisFutrue 等都已经继承这个接口了,意味着我们可以通过这个接口拿到异步的结果,也是为redis letture包做准备。

我们简单看下这个接口里面的方法

 

接口比较众多一点,不一一解释了,基本就是利用异步的特性,方便我们使用的一些异步接口。

回到CacheResult 

看到CacheResult 的构造函数

 

    public CacheResult(CompletionStage<ResultData> future) {
        this.future = future;
    }

    public CacheResult(CacheResultCode resultCode, String message) {
        this(CompletableFuture.completedFuture(new ResultData(resultCode, message, null)));
    }

    public CacheResult(Throwable ex) {
        future = CompletableFuture.completedFuture(new ResultData(ex));
    }

 

可以看到其中一个就是要传一个future进来,为了获取其异步结果,另一个就是同步的value传进来构造CacheResult,我们猜测这个异步接口,跟redis letture 异步接口是大概率有关系的,我们查看其调用处

这个调用处位于  

 

 

RedisLettuceCache

 

  @Override
    protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
        try {
            CacheValueHolder<V> holder = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
            byte[] newKey = buildKey(key);
            RedisFuture<String> future = stringAsyncCommands.psetex(newKey, timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
            CacheResult result = new CacheResult(future.handle((rt, ex) -> {
                if (ex != null) {
                    JetCacheExecutor.defaultExecutor().execute(() -> logError("PUT", key, ex));
                    return new ResultData(ex);
                } else {
                    if ("OK".equals(rt)) {
                        return new ResultData(CacheResultCode.SUCCESS, null, null);
                    } else {
                        return new ResultData(CacheResultCode.FAIL, rt, null);
                    }
                }
            }));
            setTimeout(result);
            return result;
        } catch (Exception ex) {
            logError("PUT", key, ex);
            return new CacheResult(ex);
        }
    }

从而证明我们的猜测没有错, 通过 letture 传过来的异步结果,然后传进这个 future 处理的 CompleteStage 这个接口传进 CacheValueResult里面方便其调用,

而CacheResult 里面的关键利用到这个CompleteStage 的地方也就是等待 处理结果的地方,例如

 

    public void waitForResult(Duration timeout) {
        if (resultCode != null) {
            return;
        }
        try {
            ResultData resultData = future.toCompletableFuture().get(
                    timeout.toMillis(), TimeUnit.MILLISECONDS);
            fetchResultSuccess(resultData);
        } catch (TimeoutException | InterruptedException | ExecutionException e) {
            fetchResultFail(e);
        }
    }

 

 

这里还有一点没有提到的是DataResult 里面的 Object value ,它的实际承载数据结构其实是

 

public final class CacheValueHolder<V> implements Serializable {
    private static final long serialVersionUID = -7973743507831565203L;
    private V value;
    private long expireTime;
    private long accessTime;

 // 省略其他   
}

除了必要的 value 信息,还附带了 一些 accessTime,expireTime等附带信息

总结

本文比较简单,简单介绍了下,jetcache 里面如何把异步和同步统一起来,利用的是CompleteStage 这个接口,把异步的 redis letture 客户端 返回的结果,结合到CacheResult 里面。这种异步结果获取,糅合进我们自己的数据结构的方式值得学习和思考

 

 


版权声明:本文为dongjijiaoxiangqu原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。