阿里巴巴面向对象缓存框架-JetCache

 

 

                                  阿里巴巴面向对象缓存框架-JetCache 

JetCache

JetCache是由阿里巴巴开源的通用缓存访问框架,相较于其他缓存框架该框架最大的特点就是将面向对象的思想应用到了缓存。

JetCache提供的核心能力包括:

  • 提供统一的,类似jsr-107风格的API访问Cache,并可通过注解创建并配置Cache实例
  • 通过注解实现声明式的方法缓存,支持TTL和两级缓存
  • 分布式缓存自动刷新,分布式锁 (2.2+)
  • 支持异步Cache API
  • Spring Boot支持
  • Key的生成策略和Value的序列化策略是可以定制的
  • 针对所有Cache实例和方法缓存的自动统计

连接方式

  • 基于jedis支持的客户端连接

redis有多种java版本的客户端,JetCache2.2以前使用jedis客户端访问redis。从JetCache2.2版本开始,增加了对luttece客户端的支持,jetcache的luttece支持提供了异步操作和redis集群支持。

如果选用jedis访问redis,对应的maven artifact是jetcache-redis和jetcache-starter-redis(spring boot)。

jetcache: 
  areaInCacheName: false
  remote:
    default:
      type: redis
      keyConvertor: fastjson
      poolConfig:
        minIdle: 5
        maxIdle: 20
        maxTotal: 50
      #单机redis配置
      host: ${redis.host}
      port: ${redis.port}
      #哨兵模式redis配置
      #sentinels: 127.0.0.1:26379 , 127.0.0.1:26380, 127.0.0.1:26381
      #masterName: mymaster
      #该模式下不支持集群配置

如果需要直接操作JedisPool,可以通过以下方式获取

@Bean(name = "defaultPool")
@DependsOn(RedisAutoConfiguration.AUTO_INIT_BEAN_NAME)//jetcache2.2+
//@DependsOn("redisAutoInit")//jetcache2.1
public JedisPoolFactory defaultPool() {
    return new JedisPoolFactory("remote.default", JedisPool.class);
}

然后可以直接使用

@Autowired
private Pool<Jedis> defaultPool;
  • 基于lettuce支持的客户端连接

 

redis有多种java版本的客户端,JetCache2.2以前使用jedis客户端访问redis。从JetCache2.2版本开始,增加了对lettuce客户端的支持,JetCache的lettuce支持提供了异步操作和redis集群支持。

使用lettuce访问redis,对应的maven artifact是jetcache-redis-lettuce和jetcache-starter-redis-lettuce。lettuce使用Netty建立单个连接连redis,所以不需要配置连接池。

注意:新发布的lettuce5更换了groupId和包名,2.3版本的JetCache同时支持lettuce4和5,jetcache-redis-lettuce,jetcache-starter-redis-lettuce提供lettuce5支持,jetcache-redis-lettuce4和jetcache-starter-redis-lettuce4提供lettuce4支持。

注意:JetCache2.2版本中,lettuce单词存在错误的拼写,错写为“luttece”,该错误存在于包名、类名和配置中,2.3已经改正。

jetcache: 
  areaInCacheName: false
  remote:
    default:
      type: redis.lettuce
      keyConvertor: fastjson
      #单机redis
      uri: redis://127.0.0.1:6379/
      #哨兵模式reids
      #uri: redis-sentinel://127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381/? 
      sentinelMasterId=mymaster
      #readFrom: slavePreferred
      #集群redis配置
      uri: 
        - redis://testpass@127.0.0.1:8000
        - redis://testpass@127.0.0.1:8001
        - redis://testpass@127.0.0.1:8002
        - redis://testpass@127.0.0.1:8003
     
      
       

如果需要直接使用lettuce的RedisClient:

@Bean(name = "defaultClient")
@DependsOn(RedisLettuceAutoConfiguration.AUTO_INIT_BEAN_NAME)
public LettuceFactory defaultClient() {
    return new LettuceFactory("remote.default", RedisClient.class);
}

然后可以直接使用

@Autowired
private RedisClient defaultClient;

框架缓存api

cache缓存通用api

  •      V get(K key)
  •      void put(K key, V value);
  •      boolean putIfAbsent(K key, V value); //多级缓存MultiLevelCache不支持此方法
  •      boolean remove(K key);
  •      <T> T unwrap(Class<T> clazz);//2.2版本前,多级缓存MultiLevelCache不支持此方法
  •      Map<K,V> getAll(Set<? extends K> keys);
  •      void putAll(Map<? extends K,? extends V> map);
  •      void removeAll(Set<? extends K> keys);
  •      V computeIfAbsent(K key, Function<K, V> loader)
  •      V computeIfAbsent(K key, Function<K, V> loader, boolean
  •      cacheNullWhenLoaderReturnNull)
  •      V computeIfAbsent(K key, Function<K, V> loader, boolean
  •      cacheNullWhenLoaderReturnNull, long expire, TimeUnit timeUnit)
// 以下是Cache接口中和JSR107的javax.cache.Cache接口一致的方法,除了不会抛出异常,这些方法的签名和行为和JSR107都是一样的。
	
	@Test
	public void testAPI01() {
		userCache.put(1l, "jack");
		System.out.println("cache-name=" + userCache.get(1L));
		System.out.println(userCache.putIfAbsent(1l, "tony"));
		System.out.println("cache-name=" + userCache.get(1L));
		System.out.println(userCache.putIfAbsent(2l, "tony"));
		System.out.println("cache-name=" + userCache.get(2L));
		System.out.println(userCache.remove(2L));
		System.out.println(userCache.remove(3L));
		System.out.println("cache-name=" + userCache.get(2L));
		Map<Long, String> map = new HashMap<Long, String>();
		map.put(3L, "aa");
		map.put(4L, "bb");
		userCache.putAll(map);
		Set<Long> set = new HashSet<Long>();
		set.add(1L);
		set.add(2L);
		set.add(3L);
		System.out.println("cache-name=" + userCache.getAll(set));
		userCache.removeAll(set);
		System.out.println("cache-name=" + userCache.getAll(set));
		// /当key对应的缓存不存在时,使用loader加载。通过这种方式,loader的加载时间可以被统计到。
		userCache.computeIfAbsent(1L, k -> {
			return "u1";
		});
		System.out.println("cache-name=" + userCache.get(1L));
		// 当key对应的缓存不存在时,使用loader加载。cacheNullWhenLoaderReturnNull参数指定了当loader加载出来时null值的时候,是否要进行缓存(有时候即使是null值也是通过很繁重的查询才得到的,需要缓存)
		userCache.computeIfAbsent(2L, k -> {
			return null;
		}, true);
		System.out.println("cache-name=" + userCache.get(2L));
		// AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit)
		// boolean tryLockAndRun(K key, long expire, TimeUnit timeUnit, Runnable action)
		boolean hasRun = userCache.tryLockAndRun(2L, 100, TimeUnit.SECONDS, () -> {
			System.out.println("wait....");
		});
		System.out.println("hasRun=" + hasRun);

	}

jetcache特有api

  •     CacheGetResult<L> GET(K key);
  •      MultiGetResult<K, V> GET_ALL(Set<? extends K> keys);
  •      CacheResult PUT(K key, V value);
  •      CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
  •      CacheResult PUT_ALL(Map<? extends K, ? extends V> map);
  •      CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite,
  •      TimeUnit timeUnit);
  •      CacheResult REMOVE(K key);
  •      CacheResult REMOVE_ALL(Set<? extends K> keys);
  •      CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit
  •      timeUnit);
// 这些方法的特征是方法名为大写,与小写的普通方法对应,提供了完整的返回值,用起来也稍微繁琐一些。例如:
	// CacheGetResult<L> GET(K key);
	// MultiGetResult<K, V> GET_ALL(Set<? extends K> keys);
	// CacheResult PUT(K key, V value);
	// CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
	// CacheResult PUT_ALL(Map<? extends K, ? extends V> map);
	// CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite,
	// TimeUnit timeUnit);
	// CacheResult REMOVE(K key);
	// CacheResult REMOVE_ALL(Set<? extends K> keys);
	// CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit
	// timeUnit);
	public void testAPI02() {

		CacheGetResult<String> r = userCache.GET(1L);
		String name=null;
		if (r.isSuccess()) {
			 name = r.getValue();
		} else if (r.getResultCode() == CacheResultCode.NOT_EXISTS) {
			System.out.println("cache miss:" + name);
		} else if (r.getResultCode() == CacheResultCode.EXPIRED) {
			System.out.println("cache expired:" + name);
		} else {
			System.out.println("cache get error:" + name);
		}
		
		
		// 异步API
		// 从JetCache2.2版本开始,所有的大写API返回的CacheResult都支持异步。当底层的缓存实现支持异步的时候,大写API返回的结果都是异步的。当前支持异步的实现只有jetcache的redis-luttece实现,其他的缓存实现(内存中的、Tair、Jedis等),所有的异步接口都会同步堵塞,这样API仍然是兼容的。
		//
		// 以下的例子假设使用redis-luttece访问cache,例如:
		CacheGetResult<String> r1 = userCache.GET(1L);
		// 这一行代码执行完以后,缓存操作可能还没有完成,如果此时调用r.isSuccess()或者r.getValue()或者r.getMessage()将会堵塞直到缓存操作完成。如果不想被堵塞,并且需要在缓存操作完成以后执行后续操作,可以这样做:
		CompletionStage<ResultData> future = r1.future();
		future.thenRun(() -> {
			if (r.isSuccess()) {
				System.out.println(r1.getValue());
			}
		});

	}

Vget(K key)这样的方法虽然用起来方便,但有功能上的缺陷,当get返回null的时候,无法断定是对应的key不存在,还是访问缓存发生了异常,所以JetCache针对部分操作提供了另外一套API,提供了完整的返回值。

面向对象缓存创建

通过@CreateCache注解创建一个缓存实例,默认超时时间是60秒


	/**
	 * 创建一个两级(内存+远程)的缓存,内存中的元素个数限制在50个。
	 */
	@CreateCache(name = "UserService.userCache", expire = 50000, cacheType = CacheType.BOTH)
	public Cache<Long, Student> userCache;

 

通过注解实现方法缓存
JetCache方法缓存和SpringCache比较类似,它原生提供了TTL支持,以保证最终一致,并且支持二级缓存。JetCache2.4以后支持基于注解的缓存更新和删除。

在spring环境下,使用@Cached注解可以为一个方法添加缓存,@CacheUpdate用于更新缓存,@CacheInvalidate用于移除缓存元素。注解可以加在接口上也可以加在类上,加注解的类必须是一个spring bean,例如:


 

package com.cn.src.service;
 import org.springframework.stereotype.Service;

import com.alicp.jetcache.anno.CacheInvalidate;
import com.alicp.jetcache.anno.CacheRefresh;
import com.alicp.jetcache.anno.CacheUpdate;
import com.alicp.jetcache.anno.Cached;
import com.cn.src.bean.Student;

/**
 * 方法缓存,可以蒋整个方法的结果作为缓存,减少与数据库的交互,不侵入任何业务代码
 * 
 * @author hua
 *
 */
@Service
public class MethodCacheService {


	/**
	 * refresh 刷新间隔
	 * stopRefreshAfterLastAccess
	 * 指定该key多长时间没有访问就停止刷新,如果不指定会一直刷新
	 * refreshLockTimeout
	 * 类型为BOTH/REMOTE的缓存刷新时,同时只会有一台服务器在刷新,这台服务器会在远程缓存放置一个分布式锁,此配置指定该锁的超时时间
	 * (该模式的应用场景,变化频率不高,实时请求需要耗费大量资源和时间的操作。如沃阅读的我的书架信息)
	 * @param userId
	 * @return
	 */
	@Cached(name="cache-", key = "#userId", expire = 36000)
	@CacheRefresh(refresh = 10,stopRefreshAfterLastAccess=36000,refreshLockTimeout=100)
	public Student getUserById(long userId) {
		Student s = new Student();
		s.setId(userId);
		s.setName("lyh"+System.currentTimeMillis());
		System.out.println("Cache became invalid,value:" + ",key:" + userId);
		return s;
	}
	
	
	
	@CacheUpdate(name="cache-",key = "#user.id", value = "#user")
	public void updateUser(Student user) {
		System.out.println("模拟其它更新操作......");
	}

	
	@CacheInvalidate(name="cache-", key = "#userId")
	public void deleteUser(long userId) {
		System.out.println("模拟进行数据库交互操作......");
	}


}

高级Cache API
CacheBuilder
CacheBuilder提供使用代码直接构造Cache实例的方式,使用说明看这里。如果没有使用Spring,可以使用CacheBuilder,否则没有必要直接使用CacheBuilder。

异步API
从JetCache2.2版本开始,所有的大写API返回的CacheResult都支持异步。当底层的缓存实现支持异步的时候,大写API返回的结果都是异步的。当前支持异步的实现只有jetcache的redis-luttece实现,其他的缓存实现(内存中的、Tair、Jedis等),所有的异步接口都会同步堵塞,这样API仍然是兼容的。

以下的例子假设使用redis-luttece访问cache,例如:

CacheGetResult<UserDO> r = cache.GET(userId);
这一行代码执行完以后,缓存操作可能还没有完成,如果此时调用r.isSuccess()或者r.getValue()或者r.getMessage()将会堵塞直到缓存操作完成。如果不想被堵塞,并且需要在缓存操作完成以后执行后续操作,可以这样做:

CompletionStage<ResultData> future = r.future();
future.thenRun(() -> {
    if(r.isSuccess()){
        System.out.println(r.getValue());
    }
});
以上代码将会在缓存操作异步完成后,在完成异步操作的线程中调用thenRun中指定的回调。CompletionStage是Java8新增的功能,如果对此不太熟悉可以先查阅相关的文档。需要注意的是,既然已经选择了异步的开发方式,在回调中不能调用堵塞方法,以免堵塞其他的线程(回调方法很可能是在event loop线程中执行的)。

部分小写的api不需要任何修改,就可以直接享受到异步开发的好处。比如put和removeAll方法,由于它们没有返回值,所以此时就直接优化成异步调用,能够减少RT;而get方法由于需要取返回值,所以仍然会堵塞。

自动load(read through)
LoadingCache类提供了自动load的功能,它是一个包装,基于decorator模式,也实现了Cache接口。如果CacheBuilder指定了loader,那么buildCache返回的Cache实例就是经过LoadingCache包装过的。例如:

Cache<Long,UserDO> userCache = LinkedHashMapCacheBuilder.createLinkedHashMapCacheBuilder()
                .loader(key -> loadUserFromDatabase(key))
                .buildCache();
LoadingCache的get和getAll方法,在缓存未命中的情况下,会调用loader,如果loader抛出一场,get和getAll会抛出CacheInvokeException。

需要注意

GET、GET_ALL这类大写API只纯粹访问缓存,不会调用loader。
如果使用多级缓存,loader应该安装在MultiLevelCache上,不要安装在底下的缓存上。
注解的属性只能是常量,所以没有办法在CreateCache注解中指定loader,不过我们可以这样:

@CreateCache
private Cache<Long,UserDO> userCache;
 
@PostConstruct
public void init(){
    userCache.config().setLoader(this::loadUserFromDatabase);
}
@CreateCache总是初始化一个经过LoadingCache包装的Cache,直接在config中设置loader,可以实时生效。

自动刷新缓存
从JetCache2.2版本开始,RefreshCache基于decorator模式提供了自动刷新的缓存的能力,目的是为了防止缓存失效时造成的雪崩效应打爆数据库。同时设置了loader和refreshPolicy的时候,CacheBuilder的buildCache方法返回的Cache实例经过了RefreshCache的包装。

RefreshPolicy policy = RefreshPolicy.newPolicy(1, TimeUnit.MINUTES)
                .stopRefreshAfterLastAccess(30, TimeUnit.MINUTES);
Cache<String, Long> orderSumCache = LinkedHashMapCacheBuilder
                .createLinkedHashMapCacheBuilder()
                .loader(key -> loadOrderSumFromDatabase(key))
                .refreshPolicy(policy)
                .buildCache();
对一些key比较少,实时性要求不高,加载开销非常大的缓存场景,适合使用自动刷新。上面的代码指定每分钟刷新一次,30分钟如果没有访问就停止刷新。如果缓存是redis或者多级缓存最后一级是redis,缓存加载行为是全局唯一的,也就是说不管有多少台服务器,同时只有一个服务器在刷新,这是通过tryLock实现的,目的是为了降低后端的加载负担。

与LoadingCache一样,使用@CreateCache时,我们需要这样来添加自动刷新功能

@CreateCache
private Cache<String, Long> orderSumCache;
 
@PostConstruct
public void init(){
    RefreshPolicy policy = RefreshPolicy.newPolicy(1, TimeUnit.MINUTES)
                          .stopRefreshAfterLastAccess(30, TimeUnit.MINUTES);
    orderSumCache.config().setLoader(this::loadOrderSumFromDatabase);
    orderSumCache.config().setRefreshPolicy(policy);
}
 


版权声明:本文为honger_hua原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。