如何使用分布式缓存?
一、引入
为了使系统提升性能,我们一般会将部分数据放入缓存,加速系统业务的访问,而DB只承担数据的落盘工作
1、哪些数据需要放入缓存
- 即时性、数据一致性要求不高的数据
- 访问量大,且更新频率不高的数据
二、如何使用缓存中间件Redis?
(1) 引入pom依赖
<!-- 因为lettuce导致堆外内存溢出 这里暂时排除他 使用jedis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
(2)查看redis如何配置属性
引入redis的pom依赖后,在spring boot 项目会有redis相应的配置## 合理的创建标题,有助于目录的生成
三、redis为什么会产生堆外内存溢出?
在压力测试时,我们会发现代码没有问题,但是会产生OutDirceMemoryError(堆外内存泄漏)错误。
(1)springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信
(2)lettuce的bug导致netty的堆内存溢出 (如果没有指定堆内存, 默认使用 -Xmx300) -Dio.netty.maxDirectMemory进行设置
解决方案:不能使用-Dio.netty.maxDirectMemory进行设置调大堆外内存
- 升级lettuce客户端
- 切换使用jedis客户端
四、高并发缓存失效的三个问题
(1)缓存穿透
缓存穿透: 指查询一个一定不存在的数据,由于缓存是不命中,则本次操作将要去查询数据库,但数据库中也无法查到记录,我们也没有将本次查询的null写入缓存,这导致这个不存在的数据,每次请求都要到数据库去查询,失去了缓存的意义
风险:利用不存在的数据进行攻击,数据库瞬间压力增大,最终导致崩溃、
如何解决:把null结果也写入缓存,并加入短暂的过期时间(2)缓存雪崩
缓存雪崩:指的是在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时间同时失效,请求全部转发到DB,DB瞬间压力过大雪崩 解决:
给缓存的失效时间增加一个随机值,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体的失效
(3)缓存击穿
缓存击穿: 对于一些设置过期时间的key,这些key可能在某一时间点被高频地访问,是一种非常“热点”的数据。
如果这些key在大量请求同时进来前正好过期了,那么所有对这个key的数据查询都将落到DB,称之为缓存击穿
解决: “加锁”大量并发的请求只让一个去查,其他的请求等待,通过的那个请求查到数据以后释放锁,其他人获取锁,先去缓存查询数据,就不会大量的并发请求去查询DB
六、本地锁和分布式锁
1、本地锁
// 本地锁 synchronized 进程锁 锁不住分布式的服务
// 只要是同一把锁,就可以锁住需要这个锁的所有线程
// 1、synchronized (this) springboot所有的组件在容器中都是单例的
synchronized (this) {
// 业务代码
}
2、分布式锁
/**
*分布式锁 – 原子加锁、原子解锁
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {
// 抢占分布式锁 setIfAbsent --> NX 不存在才占坑 EX 自动过期时间
// 设置redis锁的自动过期时间 - 防止出现异常、服务崩塌等各种情况,没有执行删除锁操作导致的死锁问题
// !!! 设置过期时间和加锁必须是同步的、原子的
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功...");
// 加锁成功...执行业务
Map<String, List<Catelog2Vo>> dataFromDB;
try {
// 访问数据库
dataFromDB = getCatalogJSONDataFromDB();
} finally {
// 获取对比值和对比成功删除锁也是要同步的、原子的执行 参照官方使用lua脚本解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
// 删除成功返回1 删除失败返回0
Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
Arrays.asList("lock"), uuid);
}
return dataFromDB;
} else {
// 加锁失败休眠一段时间...重试获取锁
System.out.println("获取分布式锁失败...等待重试");
// 重试的频率太快会导致内存溢出
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
return getCatalogJsonFromDBWithRedisLock(); // 自旋的方式
}
}
3、如何使用Redisson的方式实现分布式锁?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data
Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap,SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque,Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe,
Bloom filter, Remote service, Spring cache, Executor service, Live
Object service, Scheduler service)
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
1)引入依赖
<!-- 以后使用redisson作为所有分布式锁,分布式对象等功能 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
2)配置Redisson
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* Redisson配置类
* @Author mashanghaoyun
* @Date 2020/4/13 11:12
* @Version 1.0
**/
@Configuration
public class MyRedissonConfig {
/**
* 对Redisson的使用都是通过RedissonClient
* @throws IOException
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
// 单Redis节点模式
// 1、创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.56.10:6379");
// 2、根据config创建RedissonClient示例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
3)使用Redisson基础锁
Redsson官网文档,这里最基础的用法,还需要参看官网 使用!!!
/**
* 查询前台需要显示的分类数据 - redisson框架实现分布式锁
* <p>
* 缓存中的数据如何和数据库的保持一致
* ① 双写模式
* ② 失效模式
*
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedissonLock() {
// 1、占分布式锁,去reids占坑
RLock lock = redisson.getLock("catalogJson-lock");
lock.lock(); // 阻塞等待
// 加锁成功...执行业务
Map<String, List<Catelog2Vo>> dataFromDB;
try {
// 访问数据库
dataFromDB = getCatalogJSONDataFromDB();
} finally {
lock.unlock(); // 解锁
}
return dataFromDB;
}
4)读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
/**
* 写锁
* //保证一定能读到最新数据,修改期间,写锁是-个排他锁 (互斥锁、独享锁)。读锁是一个共事锁
* //写锁没释放读就必须等待
* //读+读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
* //写+读:等待写锁释放
* //写+写:阻塞方式
* //读+写,有读锁。写也需要等待。
* //只要有写的存在,都必须等待
* @Author mashanghaoyun
* @Date 2020/9/13 01:31
* @return java.lang.String
**/
@GetMapping("/write")
@ResponseBody
public String wirteLockTest() {
String uuid = UUID.randomUUID().toString();
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
RLock rLock = lock.writeLock();
rLock.lock();
try {
Thread.sleep(6000);
stringRedisTemplate.opsForValue().set("write-value", uuid);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return uuid;
}
/**
* 读锁
* @Author mashanghaoyun
* @Date 2020/9/13 01:31
* @return java.lang.String
**/
@GetMapping("/read")
@ResponseBody
public String readLockTest() {
String s = "";
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
// 获取读锁,如果有其他的线程在写操作, 则需要等到写锁释放锁以后才可进行读的操作
RLock rLock = lock.readLock();
rLock.lock();
try {
s = stringRedisTemplate.opsForValue().get("write-value");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
5)信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
/**
* 获取信号量
* @Author mashanghaoyun
* @Date 2020/9/13 20:00
* @return java.lang.String
**/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore semaphore = redisson.getSemaphore("park");
// semaphore.acquire(); // 阻塞等待
boolean flag = semaphore.tryAcquire();
if (flag) {
return "停车了!";
} else {
return "车库满了!";
}
}
/**
* 释放信号量
* @Author mashanghaoyun
* @Date 2020/9/13 20:00
* @return java.lang.String
**/
@GetMapping("/go")
@ResponseBody
public String go() {
RSemaphore semaphore = redisson.getSemaphore("park");
semaphore.release();
return "车开走了!";
}
6)闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
七、缓存数据一致性的问题
八、SpringCache缓存简化分布式缓存的开发
1、引入 pom依赖
<!-- 因为lettuce导致堆外内存溢出 这里暂时排除他 使用jedis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- 分布式缓存 需要和redis配合使用 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
2、编写配置文件
# 配置使用redis作为缓存
spring.cache.type=redis
# 以毫秒为单位 1小时
spring.cache.redis.time-to-live=3600000
# 配置缓存名的前缀 如果没配置则使用缓存名作为前缀
# spring.cache.redis.key-prefix=CACHE_
# 配置前缀是否生效 默认为ture
#spring.cache.redis.use-key-prefix=false
# 是否缓存空值 默认为true
spring.cache.redis.cache-null-values=true
#spring.cache.cache-names=
3、开启缓存
@EnableCaching
4、使用缓存
1)@Cacheable: 触发将数据写入缓存
/**
* 1、每一个需要缓存的数据我们都要来指定放到哪个名字的缓存;【按照业务类型来划分取名】
* 当前方法的结果需要缓存 如果缓存中有,方法不调用;
* 如果缓存中没有,会调用该方法,最后将方法的结果放入缓存
* 2、默认行为
* 1)默认缓存不过期
* 3、自定义
* 1)指定缓存生成指定的key
* 2)指定缓存的过期时间 配置文件修改ttl
* 3)将缓存的value保存为json格式
* 4、缓存的三大问题
* 1)缓存击穿: springcache 默认是不加锁的,需要设置sync = true
* 2)
*/
@Cacheable(value = {"category"}, key = "#root.method.name", sync = true)
public List<CategoryEntity> getLeve1Categorys()
2)@CacheEvict: 触发将数据从缓存中删除
// @Caching(evict = {
// @CacheEvict(value = "category", key = "'getLeve1Categorys'"),
// @CacheEvict(value = "category", key = "'getCatalogJson'")
// })
// allEntries = true 删除category 分区的所有缓存 批量清除
@CacheEvict(value = {"category"}, allEntries = true) // 双写模式
public void updateCascade(CategoryEntity category)
3)@CachePut: 不影响方法执行更新缓存
4)@Caching: 组合以上缓存的操作
@Caching(evict = {
@CacheEvict(value = "category", key = "'getLeve1Categorys'"),
@CacheEvict(value = "category", key = "'getCatalogJson'")
})
5)@CacheConfig: 在类级别共享相同的缓存配置
5、修改缓存存储的序列化器
import org.springframework.boot.autoconfigure.cache.CacheProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* 缓存配置类
* @Author mashanghaoyun
* @Date 2020/4/14 03:57
* @Version 1.0
**/
@EnableConfigurationProperties(CacheProperties.class)
@EnableCaching
@Configuration
public class MyCacheConfig {
@Bean
public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
// 修改默认的序列化器
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}