Redis详细笔记,基本操作、集群、应用问题等,值得收藏

???创作不易,各位看官点赞收藏.

Redis 笔记

redis是一个NoSql(Not Only Sql)非关系型数据库,不依赖业务逻辑方式储存,以简单的key-value方式进行存储。可以配合关系型数据库进行缓存数据。

1、安装及环境搭建

redis官网:https://redis.io/,redis不支持windows系统,必须安装到Linux系统下,下载好redis的压缩包上传到服务器上。

c语言环境:redis编译需要c语言环境。

yum install gcc # 安装gcc环境
gcc --version # 检查gcc的版本

解压编译redis

tar -zxvf redis-7.0.4.tar.gz  # 解压
# 解压好进入redis目录
make # 编译
make install # 安装

安装成功后,会在/usr/local/bin安装redis的相关内容:

  • redis-benchmark:性能测试工具。
  • redis-check-aof:修复有问题的AOF文件。
  • redis-check-dump:修复有问题的dump.rdb文件。
  • redis-sentinel:redis集群搭建。
  • redis-server:redis服务启动命令。
  • redis-cli:客户端,操作入口。

进入目录/usr/local/bin,执行redis-server启动redis:

image-20220811105023518

这种启动方式只能在窗口开启是进行访问,如果关闭启动窗口redis也就关闭。

修改redis配置文件:(在redis解压目录下)

  • 注释掉 bind 127.0.0.1 这一行(解决只能特定网段连接的限制)
  • 将 protected-mode 属性改为 no (关闭保护模式,不然会阻止远程访问)
  • 将 daemonize 属性改为 yes (这样启动时就在后台启动)
  • 添加 requirepass 123456即可 当然这步完全看心情
  • 将port XXX 修改成自己的端口(默认是6379)

启动redis:

redis-server /opt/redis/redis-7.0.4/redis.conf

image-20220811111209447

如果修改了端口,如要防火墙放行对应的端口。关闭redis直接killredis的进程号即可。

firewall-cmd --zone=public --add-port=6379/tcp --permanent  # 开放端口
firewall-cmd --reload # 重启防火墙
firewall-cmd --list-ports  # 查看开放的端口号

远程连接redis:配置好上面的信息然后,可以下载一个Another Redis Desktop Manager可视化的redis管理工具。

image-20220811220135196

redis相关信息:redis默认16个数据库,下标从0开始(默认初始使用0号数据库)。

select dbid # 切换数据库,例如select 10,这些库的密码都是同一个密码
dbsize # 查询当前数据库的key数量
flushdb # 清空当前库
flushall # 清空所有库

2、五大数据类型

key操作

keys 正则表达	# 查看当前库中满足正则表达式的所有key
exists key	# 判断某个key是否存在
type key # 查看可以的类型
del key # 删除key
unlink key # 非阻塞删除,现将key从keyspace中删除,后续异步再执行真正的删除
expire key time(单位秒)	# 设置key的过期时间
ttl key # 查看可以会有多少秒过期,-1表示永久不会过期,-2表示已过期
get key # 获取某个key的值

2.1、String

String是redis最基本的数据类型,一个key对应一个value。String是二进制安全的,相当于所有的数据都可以通过String进行储存。例如图片或者是序列化的对象。但是一个key的value值最大是512M。

set key value # 设置string类型的key,如果key不存在就新增一个key,如果存在就会覆盖掉之前的key
get key # 获取key对应的值
append key value # 如果key存在,就向key的value追加,如果key不存在就新添加一个key
strlen key # 获取value的长度
setnx key value # 如果key不存在就新增一个key,如果存在就不会覆盖也不会新增key

# 对于纯数字类型的自增和自减
incr key # 自增1
decr key # 自减1
increby key step # 自增step
decrby key step # 自减step

redis中的自增和自减:

​ 在redis中,自增和自减是一个原子操作,不会被线程调度打断。redis是单线程,所以能够在单指令完成的操作就是原子操作,多个线程操作同一条数据相互之间互不影响。

mset k1 v1 k2 v2 k3 v3 # 设置多个key
mget k1 k2 k3 # 获取多个key的值
msetnx k1 v1 k2 v2 k3 v3 # 设置多个key,和setnx类似,所有的key都不存在是才会设置成功,有一个key存在所有的key都会设置失败
getrange key start end # 截取value的值,start、end是下标,类似substring。前后都包含
setrange key index value # 设置key从index下标使用value开始覆盖
setex key time() value # 在设置key的时候设置过期时间
getset key value # 设置key新值的同时返回旧值

redisd字符窜底层数据结构:

​ 是一个动态的字符串,可以修改的字符串。类似Java的ArrayList,采用分配冗余空间减少内存的频繁分配。有一个len的阈值,当操作这个阈值就会进行扩容。容量小于1M时会双倍扩容,大于1M之后每次扩容只会增加1M,最大容量是512M。

2.2、List

​ redis列表是简单的字符串列表,按照插入顺序进行排序,可以在头和尾部添加数据。它底层就是一个循环列表,在两端操作数据性能较好,但是通过下标检索元素性能较差。

常见命令:

lpush/rpush key v1 v2 ...	# 向key的左边/右边添加多个值,如果key不存在就新增key
lpop/rpop key [count] # 从左边/右边去除元素,没有指定count就是一个,指定count是几个,就移除几个,值在key就在
lrange key start end # 获取列表中的元素,从start开始,到end结束,0 -1表示获取全部元素
rpoplpush key1 key2 # 从key1的右边去除一个元素,然后加入到key2的左边
lindex key index # 获取key对应下标的元素
llen key # 获取list的长度
linsert key before/after v1 newvalue # 在元素v1的前面/后面新增一个newvalue元素
lrem key n value # 从左边删除n个value的元素
lset key index value # 将下标index的元素设置为新的元素

底层数据结构:

​ 底层使用的是快速链表。在元素比较少的情况下,会使用一段连续空间存储,ziplist(压缩链表)。在元素较多的情况下才会改成快速链表,普通链表需要一部分内存空间去存放指针,redis将压缩链表和快速链表结合起来了,将多个压缩链表使用快速链表的方式链连接起来,这样就不会出来空间冗余。

2.3、Set

​ set是一个元素不可重复的列表,是一个string类型的无序集合,底层是一个value为null的hash表。查询、添加、删除的复杂度都是O(1)。

常见命令:

sadd key v1 v2 v3 。。。 # 添加一个或多个元素,元素不能重复
smembers key # 查看集合的所有元素
sismember key v # 判断v元素是否存在,1表示存在,0表示不存在
scard key # 获取集合元素的个数
srem key v1 v2 ... # 删除集合中的元素
spop key n # 随机从集合中移除n个元素并返回移除的元素
srandmember key n # 从集合中随机取出n个值,但是不会从集合中删除
smove key1 key2 v1 # 将集合key1中的v1元素移到key2集合中
sinter key1 [key2 key3..] # 返回key1集合与后面集合的 交集
sunion key1 [key2 key3..] # key1与其他集合的 并集
sdiff key2 [key2 key3..] # key1与其他集合的 差集 (在key1中有,在其他集合中没有)

2.4、Hash

​ redis hash是一个键值对集合,一个string类型的filed和value的映射表,适合存储对象类似java中的map。用户的id作为key,存储的value就是用户的信息。

常见命令:

hset key filed value [filed value] # 添加hash集合,filed - value 一一对应
hget key filed # 获取filed对应的value
hexists key filed # 判断key的filed是否存在,1存在,0不存在
hkeys key # 查看key中所有的filed
hvals key # 查看key中filed对应的所有value
hincrby key filed value # 给filed值加上value(都要为数字)
hsetnx key filed value # 添加一个key中不存在的filed,如果存在就添加不成功

2.5、Zset

​ 一个没有重复元素的set集合,每一个元素都关联了一个score(评分)。集合根据评分来排序集合,集合中成员是唯一的评分可以不是唯一的。

常见命令:

zadd key score1 value1 score2 value2... # 新增集合中元素,并携带score
zrange key start end [withscores] # 获取集合中start到end下标的元素,如果后面添加withscores会将元素的score也会返回到集合中
zrangebyscore key min max # 获取到评分在min和max之间的元素
zrevrangebyscore key min max # 将评分在这个区间的元素,按照从大到小进行排序
zincrby key incr value # 把value值对应的评分增加incr
zrem key v1 v2 ... # 从集合中删除元素
zcount key min max # 统计在这个区间元素的个数
zrank key value # 返回value在集合中排名,从0开始

2.6、发布与订阅

​ redis的发布与订阅是一种消息的通讯模式,发送者发送消息,订阅者接收消息。redis客户端可以订阅任意数量的频道。

image-20220817160327813

subscribe 频道名1 频道名2 。。。。 # 订阅者订阅多个频道
publish 频道名 message # 发布者通过频道发布消息

如果发布者发布的频道被其他客户端订阅了频道,那么在发布的时候订阅的客户端就会接收到消息。

2.7、新数据类型

Bitmaps:可以实现对位的操作,实际上也是字符串,但是可以对字符串进行位炒作。能有效提高内存使用率和开发效率。可以看做成一个以位为单位的数组,只能存储0和1,数组的下标称为偏移量。

setbit key offset 0|1	# 设置一个key的bitmap,offset是偏移量,值只能是0或1(如果偏移量过大,整个初始化过程会比较慢,所以一般在初始化之前会减去某一个数字)
getbit key offset # 获取位对应偏移量的值,没有设置对应偏移量的值就返回0
bitcount key [s,e] # 统计key中值为1的个数,可以指定一个偏移量范围,0 -1表示全部
bitop and|or|not|xor dest key1 key2 .... # 将这些集合进行求 和|或|非|异或 操作,然后将结果放在dest集合中返回记录条数

HyperLogLog:用来做基数统计(集合中不重复元素个数)的算法,再输入元素数量很大时,计算基数时所需要的的空间总是固定的并且很小。

pfadd key e1 e2 ....  # 添加元素,如果基数发生变化返回1,否则返回0
pfcount key # 统计基数的个数
pfmerge newkey k1 k2 ... # 将k1 k2 ... 的基数合并到一个新key中

Geospatial:提供经纬度设置、查询、范围查询、距离查询,经纬度hash等操作。

geoadd key 经度1 纬度1 name1 经度2 纬度2 name1 。。。 # 添加地理位置,name是地理名称
# 经度范围-180到180,纬度范围-85到85,如果超出给定范围会返回一个错误,重复名称不能添加
geopos key name # 返回name对应的经纬度信息
geodist key name1 name2 [m|km|ft|mi] # 返回这两个地理位置的直线距离,默认单位是米,也可以指定单位千米、英尺、英里
georadius key 经度 纬度 radius [m|km|ft|mi] # 查询在给定经纬度为中心,radius为半径中的元素

3、jedis

jedis:java连接开发工具!使用Java操作Redis 中间件!(就是一个jar包)如果你要使用java操作redis,那么一定要对Jedis十分的熟悉!

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.2.3</version>
</dependency>
// 进行连接
public static void main(String[] args) {
    String host = "127.0.0.1";
    int port = 6001;
    Jedis jedis = new Jedis(host,port);
    // 有密码需要输入密码
    jedis.auth("liujixing");
    // 测试是否连接成功,输出pong就是连接成功
    String ping = jedis.ping();
    System.out.println(ping);
}

使用jedis可以进行对redis的所有命令操作:

// 常见命令都可以进行实现
@Test
public void test1(){
    Jedis jedis = new Jedis("101.43.29.221",6001);
    jedis.auth("liujixing");

    jedis.set("user:101","张三");
    jedis.set("user:102","李四");
    jedis.set("user:103","王五");
    jedis.mset("user:104","104","user:105","105");

    Set<String> keys = jedis.keys("*");
    for (String key : keys) {
        System.out.println(key);
    }
}

4、 Spring Boot 整合redis

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.6.8</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
</dependency>

配置redis相关内容:

spring:
  redis:
    host: 101.43.29.221
    port: 6001
    password: XXXX
    database: 0 # 库下标,0-15
    timeout: 1800 # 连接超时时间
    lettuce:
      pool:
        max-active: 20 # 最大连接数
        max-wait: -1 # 最大阻塞时间,-1没有限制
        max-idle: 5 # 最大空闲连接
        min-idle: 0 # 最小空闲连接

创建redis配置类:

@Configuration
@EnableCaching  // 开启缓存
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setConnectionFactory(factory);
        // key序列化
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // value序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        // hashmap序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        return redisTemplate;
    }


    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory){
        RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        // 解决缓存转换异常
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 配置序列化,解决乱码问题,过期时间600秒
        RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(stringRedisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        return RedisCacheManager.builder(factory)
                .cacheDefaults(cacheConfiguration)
                .build();
    }
}

测试:通过RedisTemplate就可以对redis进行操作

image-20220818214342299

@RestController
public class RedisController {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @GetMapping("/")
    public String test1(){
        redisTemplate.opsForValue().set("user:10001","10001");
        String login = (String) redisTemplate.opsForValue().get("user:10001");
        System.out.println(login);
        return login;
    }
}

5、Redis 事务和锁

5.1、事务

Redis事务是一个单独的隔离操作,事务中的所有命令都会序列化、按照顺序执行,事务执行过程中不会被其他客户端发来的命令打断。串联多个命令防止别的命令插队。在Redis中,有三个基本命令,multi、exec、discard。

  • multi:开始事务,进行命令组队,接下来的所有命令都会按照顺序进行排队。
  • exec:执行命令,将排队的命令按照排队顺序执行。
  • discard:回滚操作,将刚才执行的命令取消并恢复原来的数据,在exec执行前执行。

image-20220822103100881

错误处理:

  • 组队的时候,任何一个命令失败了,执行时整个队列都会取消
  • 在执行过程中,只有出现错误的命令取消,其它成功的命令依然会执行

5.2、锁

​ 在事务中,如果几个操作去操作同一条数据可能会发生数据不同步的问题,这就事务冲突,我们可以采取锁机制来防止事务事务宠物问题。

悲观锁:在每次取数据的时候都会认为其它操作会修改数据,都会给数据加上锁,加上锁的数据其它操作就不能获取只有当操作务执行完毕并释放锁才能由其它操作获取数据。

image-20220822111252142

乐观锁:每次取数据时都认为其它操作不会修改数据,就不会加上锁但是会有一个数据版本号。去执行操作的时候,会将数据版本号和数据的版本号判断时候相等,如果相等就执行并更新版本号,如果不相等这个操作就不会执行成功。这种使用于多读操作,提高吞吐量。Redis事务的底层用的就是check-and-set机制实现的。

image-20220822112006436

Redis中乐观锁操作:

​ 在执行multi命令之前,先执行watch key1 key2 ... ,如果在事务执行前这些key的值被其它操作修改,那么这个事务就会被打断。

image-20220822115509609

image-20220822115415003

unwatch命令取消对所有key的监视。

Redis事务三大特性:

  • 单独隔离操作:事务中的所有命令都会按顺序序列化,在事务执行过程中不会被其它客户端发送的命令插队,会按照顺序执行。
  • 不存在隔离级别:事务中的命令在exec之前都不会被实际执行的,只有exec后才会实际执行。
  • 不存在原子性:在执行时,成功的命令会提交,不成功的命令不会提交。不存在要么成功,要么失败。

5.3、秒杀案例

基础环境搭建:

@Service
public class SecKillService {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    public String secKill(String pid,String uid) {
        String pKey = "pro:"+pid;
        String uKey = "sec:user";

        Boolean key = redisTemplate.hasKey(pKey);
        if (Boolean.FALSE.equals(key)){
            return "秒杀活动尚未开始,请等待~~~";
        }
        Boolean flag = redisTemplate.opsForSet().isMember(uKey, uid);
        if (Boolean.TRUE.equals(flag)){
            return "秒杀活动一人只能参加一次,不能重复参加~~~";
        }
        Optional<Integer> count = Optional.ofNullable((Integer) redisTemplate.opsForValue().get(pKey));

        if (count.orElse(0)<=0){
            return "手慢了,已经抢光了~~~";
        }

        // 进行秒杀
        redisTemplate.opsForValue().decrement(pKey);
        redisTemplate.opsForSet().add(uKey,uid);
        return "秒杀成功";
    }
}

安装并发执行工具:(使用云服务器,可以下载windows版的ab)

yum install httpd-tools # 安装
ab -n 1000 -c 100 -p 参数文件保存路径 -T "application/x-www-form-urlencoded" url

# 参数 
# -n 请求次数
# -c 并发次数
# -T 如果是POST、PUT请求需要携带请求参数
# -p 参数保存文件路径
# url 请求地址

使用上面代码进行秒杀活动,会出现数据错误,库存可能会为负数出现超卖的情况。

RedisTemplate乐观锁解决:

public String secKill(String pid,String uid) {
    String pKey = "pro:"+pid;
    String uKey = "sec:user";
    Boolean key = redisTemplate.hasKey(pKey);
    if (Boolean.FALSE.equals(key)){
        return "秒杀活动尚未开始,请等待~~~";
    }
    Boolean flag = redisTemplate.opsForSet().isMember(uKey, uid);
    if (Boolean.TRUE.equals(flag)){
        return "秒杀活动一人只能参加一次,不能重复参加~~~";
    }
    Optional<Integer> count = Optional.ofNullable((Integer)redisTemplate.opsForValue().get(pKey));
    if (count.orElse(0)<=0){
        System.out.println("手慢了,已经抢光了~~~");
        return "手慢了,已经抢光了~~~";
    }

    // 开启事务
    redisTemplate.setEnableTransactionSupport(true);
    List<Object> results = redisTemplate.execute(new SessionCallback<List<Object>>() {
        @Override
        public List<Object> execute(RedisOperations operations) throws DataAccessException {
            // 监视key
            operations.watch(pKey);
            operations.multi();
            operations.opsForValue().decrement(pKey);
            operations.opsForSet().add(uKey,uid);
            return operations.exec();
        }
    });
    if (ObjectUtil.isEmpty(results)){
        System.out.println("秒杀失败");
        return "秒杀失败,请重试~~~";
    }
    System.out.println("秒杀成功");
    return "秒杀成功";
}

库存遗留:使用乐观锁时,如果多个请求获取的数据版本号一致,那么其它请求就会操作失败,在一定数量的请求时,就可能存在库存遗留的问题。可以采取Lua脚本去解决库存遗留问题,将复杂或多部的redis操作写成一个脚本,一次提交给redis执行减少连接次数。Lua脚本类似redis中的事务,有原子性不会被其它命令插队。Lua脚本解决redis争抢问题,实际上利用的是redis的单线程,用任务队列的方式解决任务并发问题。

local uid = KEYS[1];
local pid = KEYS[2];
local pKey = "pro:"..pid.."";
local uKey = "sec:user";

local isSecKill = redis.call("sismember",uKey,uid);
if tonumber(isSecKill) == 1 then
    return 2;
end

local count = redis.call("get",pKey);
if tonumber(count) <= 0 then
    return 0;
else
    redis.call("decr",pKey);
    redis.call("sadd",uKey,uid);
    return 1;
end
@Service
public class SecKillService {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    // lua脚本的返回值类型为Long
    private DefaultRedisScript<Long> luaScript;

    // 初始化脚本
    @PostConstruct
    public void init(){
        luaScript = new DefaultRedisScript<>();
        luaScript.setResultType(Long.class);
        luaScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("secKill.lua")));
    }

    public String secKill(String pid,String uid) {
        String pKey = "pro:"+pid;
        Boolean key = redisTemplate.hasKey(pKey);
        if (Boolean.FALSE.equals(key)){
            return "秒杀活动尚未开始,请等待~~~";
        }

        Long result = redisTemplate.execute(luaScript, Arrays.asList(uid, pid));
        Optional<Long> optional = Optional.ofNullable(result);
        if (optional.orElse(0L)==2){
            System.out.println("你已经秒杀一次了,一人只能秒杀一次~~~");
            return "你已经秒杀一次了,一人只能秒杀一次~~~";
        }else if (optional.orElse(0L) == 1){
            System.out.println("秒杀成功");
            return "秒杀成功";
        }else {
            System.out.println("你手慢了,秒杀活动已结束");
            return "你手慢了,秒杀活动已结束";
        }
    }
}

6、持久化

​ Redis的数据是保存在内存中,一旦服务器宕机数据就会全部丢失,这就需要将数据进行持久化。在Redis中有两种持久化方式,RDB(Redis DataBase)、AOF(Append Of File)。

6.1、RDB

RDB:在指定的时间间隔内,将内存数据的快照写到磁盘中,恢复时将磁盘中的文件读取到内存。Redis会单独创建一个子线程(fork)进行持久化,先将数据写到一个临时文件等持久化完成以后用这个临时文件去替换上次持久化完成的文件。在这个过程中,主线程不进行任何的IO操作,如果需要大量数据的持久化并且数据完整性要求不高可以使用RDB方式,因为REB在最后一次持久化时,如果还没到达持久化时间服务器宕机就会出现数据丢失

image-20220823161933778

dump.rdb:这个文件在redis的启动目录下,默认名称是dump.rdb。redis默认是开启了RDB持久化操作,下面是redis.conf的相关配置。

dbfilename dump.rdb # 持久化的文件名称
dir ./ # 保存路径,默认是启动的当前路径,每次启动redis都会在你指定路径下创建持久化文件
stop-writes-on-basave-error yes # 当redis无法写入硬盘(满了),就会关闭redis
rdbcompression yes # 对于持久化的文件是否进行使用LZF算法进行压缩
rdbchecksun yes # 使用CRC64算法进行数据检查
save 时间 count # 如果在这个时间以内,key发生变化数量至少count个,就会进行持久化
# save 20 3 在20秒内如果至少有三个key发生变化,就会进行持久化操作

备份:可以将redis中持久化的文件拷贝到其它路径或者修改一个名称,如果需要使用备份就将备份文件移入启动目录或者修改名称为dump.rdb。

cp dump.rdb d.rdb # 拷贝一份文件

总结:RDB方式适合大量数据的持久化并且对数据完整性要求不严格,但是RDB方式节省磁盘空间并且恢复数据快,在进行持久化时,内存的数据会被拷贝一份,需要2倍数据膨胀。

6.2、AOF

AOF:以日志的形式记录每一个操作,将Redis执行过程中的所有指令记录下来,只许追加到文件上不能修改文件,redis重启会读取文件重新构建数据,就是将日志中的执行从前到后执行一次来恢复数据工作。

# aof默认是不开启的,需要手动开启,而RDB默认开启的
appendonly yes # 开启aof持久化
appendfilename "appendonly.aof"	# 持久化保存文件,保存路劲在启动目录下

如果RDB和AOP同时开启,redis默认使用AOF持久化,启动redis就会默认使用appendonly.aof进行数据初始化。

备份:与RDB原理相同,将appendonly.aof换一个路径或换名称,需要加载就改成appendonly.aof名称。

AOF文件异常修复:在进行AOF持久化时可能存在AOF文件损坏,这样备份文件在初始化数据时就会出错。通过redis提供的redis-check-aof --fix进行文件修复。

redis-check-aof --fix aof文件保存路径 # 重启redis就可以修复文件了

AOF同步频率:在redis的配置文件中可以设置AOF每隔多久进行一次持久化。

appendfsync  [always|everysec|no]

# always:redis的每一次操作都会写到aof文件中,性能差,但是数据完整
# everysec:每秒记录日志一次,如果宕机就可能当前秒的数据丢失
# no:redis不主动同步,把同步时机交给操作系统

压缩重写:AOF采用文件追加方式,文件可能会越来越大,这时就增加了压缩重写,当文件大小达到一个阈值时就会将文件进行重写。

# 重写就是将之前多个操作重写成一个操作,但是最后的结果是一样的
set k1 v1
set k2 v2

# 就会将上面的重写成一个操作
set k1 v1 k2 v2 
no-appendfsync-on-rewrite yes # 不写入aof文件而是写入缓存,相当于不重写请求不会阻塞,但是在这期间服务器宕机缓存数据会丢失
no-appendfsync-on-rewrite no # 进行重写,重写时请求可能出现阻塞

重写触发机制:当aof文件达到某一个大小时就会进行重写操作。

auto-aof-rewrite-min-size 64mb # 阈值为64mb
auto-aof-rewrite-percentage 100 # 当操作阈值的100%就会进行重写,相当于是文件大小操作128mb就会重写

流程:

  • 客户端请求命令追加到AOF缓冲区。
  • AOF根据持久化策略将缓冲区的命令持久化到appendonly.aof文件中。
  • 当持久化文件超过重写大小就进行重写操作。
  • redis重启会加载持久化中文件命令进行数据恢复。

7、主从复制

主从复制:主机更新数据后会根据配置和策略,自动同步到备机上(master/slaver)。master以写为主,slaver以读为主。(只能一主多从,不能多主,但是可以配置集群)

image-20220824123244137

  • 读写分离:这样应用可以将读和写操作分开,减少redis服务压力提高性能。
  • 容灾快速恢复:如果某一个从机发生故障,可以根据策略从其它从机上读取数据

7.1、配置主从复制

​ 可以在一台服务器上通过不同端口号去启动不同的redis服务,需要将redis的配置文件设置成几份然后用不同的配置文件去启动redis服务。也可以用几台虚拟机去创建独立redis服务,也可以购买云服务器创建独立的redis服务。

info replication # 查看redis服务信息

image-20220824134349008

配置从机:在配置文件中配置

slaveof 主机ip port # 从机配置文件上连接主机
masterauth xxxx	# 主机的密码

image-20220824140951574

image-20220824141147804

主机上可以进行读写操作,但是从机上只能读取不能写入,写入就会报错。当从机宕机后,主机会感知到会移除这个从机。如果主机宕机后,从机会感知但是不会移除主机只是主机状态变成下线状态,当主机重新上线从机依然会连接到主机。

复制原理:主机上每次的操作都会将数据同步到从机上。

  • 从机连接到主机上,会向主机发送数据同步的消息。
  • 主机接到同步消息,主机就会进行持久化操作rdb文件,然后将rdb文件交给从机,从机就会将rdb文件进行全量复制(所有数据都会按照rdb文件中的数据,从机之前的数据就会删除)
  • 然后每次主机进行写操作的时候,就会进行增量复制(将操作的数据进行同步)
  • 只要是重新连接都会进行全量复制

薪火相传:一个主机下可以有多个从机,一个从机下也可以有多个从机,但是从机还是从机。主机每次同步数据时只会同步给主机下的从机,然后再由从机同步给它下面的从机。

# 直接在配置文件中将slaveof xxxx port 修改为上一个从机就可以

反客为主:当主机服务宕机以后,在这个主机下的所有从机都会升级为主机,而从机下的从机不会发生变化。

# 当主机宕机,需要手动使用命令将从机变成主机
slaveof no one

7.2、哨兵模式

哨兵模式:反客为主的自动版,能够后天监控主机是否故障,如果故障了就会根据投票数将从机转换成主机。

# 配置哨兵,在配置sentinel.conf文件中配置哨兵
daemonize yes # 后台启动
port 26379 # 端口
sentinel monitor mymaster 监视的主机ip 端口 x # mymaster是给监控对象起的名称,x是至少有几个哨兵同意迁移才能迁移(一个主机可以有多个哨兵)
sentinel auth-pass 监视名称 xxxx(监视对象的密码)

redis-sentinel 配置文件路径 # 启动哨兵

image-20220824193940912

新主机选择规则:

  1. 配置从机优先级,在从机的redis.conf文件中。
replica-priority 100 # 设置从机优先级,数字越小优先级越高,默认是100
# 7版本以上是slave-priority
  1. 选择偏移量大的从机,数据上如果和主机同步率越高,越先被选择。
  2. 在每次启动redis服务,都会产生一个ruuid(40位),ruuid越小越先被选择。

主机宕机:

​ 当主机宕机后,哨兵会根据新主机选择规则选出新主机,然后在将旧主机下的从机转移到新主机下,自己下面的从机不变。当故障解除后,重启旧主机的服务,旧主机就会作为从机在新主机下。

8、Redis集群

​ 可以在实际开发中Redis可能出现内存容量不够以及在并发操作下性能提高的需求,另外在薪火相传、反客为主、主机宕机哨兵选择新主机情况下导致ip地址的变化。在Redis3.0之前是通过代理主机的方式解决,3.0之后通过无中心化的集群配置来解决提到的问题。

代理主机:所有的应用请求都请求到代理主机上,然后代理主机来分发请求到各个redis主机上,然后得到的数据返回给应用。

image-20220825193602244

无中心化集群:每一个主机都可以作为请求的入口,当请求来了后会到请求主机上查询数据,如果有这条数据就返回,没有就会去其他主机上查询,主机与主机之间是相通的。

image-20220825194504770

​ redis集群实现了redis的水平扩容,启动N个redis节点,那么整个数据都会分布存储存在这N个节点中,每一个节点储存总数据的1/N。即使某个redis节点失效并无法进行通讯,集群中的其它节点依然可以处理请求。

8.1、搭建集群

配置文件:修改配置文件(一台服务器使用不同的端口号)。

# 开启集群
cluster-enabled yes 
# 节点配置名
cluster-config-file nodes-6379.conf 
# 节点失联时间,操过这个时间自动主从切换
cluster-node-timeout 15000	

启动所有redis服务,进行redis安装路径下的src目录下使用命令:

# 确保redis的nodes-6379.conf 文件都生成了
# 将所有的redis合成一个集群,只需要使用下面命令
redis-cli --cluster create --cluster-replicas 1 各个主机的真实ip地址(使用空格隔开127.0.0.1:6379)
# 1表示一台主机有一台从机,它会自动分配

image-20220825205211082

搭建集群前一定要把之前的dump.rdb文件全部删除,并且需要打开所有redis服务的集群总线端口,服务端口+10000

image-20220825211116375

image-20220825213657560

测试集群:

# 因为是集群,所以可以通过任何一个主机连接到服务上
redis-cli -c -h ip -p port -a password  # -c是集群连接,其它配置是正常配置
# 连接后使用命令
cluster nodes # 查看集群中所有节点信息,有myself字样的服务是当前连接

8.2、集群操作

​ 在Redis集群中应该至少有三个节点,并且按照每一个主机在不同的服务器(不同ip)上,每个从机不和自己主机在同一服务器上,这样就保证了当主机宕机以后从机能快速接替主机继续进行工作。

slot(插槽):在集群搭建成功后集群中会有16384个插槽,集群中每一个节点会负责一部分插槽。在每次向集群中插入key时,会根据CRC16(key)%16384去计算key对应的插槽,然后找到对应处理主机进行处理。

image-20220826102200886

image-20220826102618502

集群常用操作:

# 在集群中不能使用一次添加多个key的命令,例如mset k1 v1 k2 v2....,需要使用分组方式来进行批量添加
mset k1{name} v1 k2{name} v2 # name是分组名,会根据name计算slot值,然后进行处理

cluster keyslot key # 进行这个key的插槽值,不存在的key也可以计算
cluster countkeysinslot 插槽值 # 查看当前主机范围slot下key的数量
cluster getkeysinslot 插槽值 n # 返回插槽值下n个key,也只能返回自己主机上的keys

故障恢复:集群中某个主机节点宕机,那么它的从机马上替换主机成为新的主机,继续处理请求,就主机修复后启动会作为新主机的从机。(15秒超时自动断开连接)

​ 如果集群中某一段插槽节点的主机从机都宕机了, 可以通过在redis.conf配置cluster-require-full-coverage来设置,如果值是yes表示某一节点宕机,那么整个集群将不可用,如果值为no,表示宕机的节点的插槽不可用不可写入,但是其它插槽依然可以继续使用。

8.3、Spring Boot整合Redis集群

导入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.6.8</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.6.8</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
</dependency>

配置文件:

spring:
  cache:
    redis:
      time-to-live: 10000
  redis:
    timeout: 5000
    database: 0
    cluster:
      nodes: 各个节点ip:port (127.0.0.1::6379),多个主机使用英文逗号隔开
      max-redirects: 3
    lettuce:
      pool:
        max-active: 20 # 最大连接数
        max-wait: -1 # 最大阻塞时间,-1没有限制
        max-idle: 5 # 最大空闲连接
        min-idle: 0 # 最小空闲连接
    password: XXXX

配置RedisTemplate:

@Configuration
@EnableCaching  // 开启缓存
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setConnectionFactory(factory);
        // key序列化
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // value序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        // hashmap序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        return redisTemplate;
    }


    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory){
        RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        // 解决缓存转换异常
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 配置序列化,解决乱码问题,过期时间600秒
        RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(stringRedisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        return RedisCacheManager.builder(factory)
                .cacheDefaults(cacheConfiguration)
                .build();
    }
}

封装redisTemplateService:

@Service
@Slf4j
public class RedisTemplateService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 指定缓存失效时间
     *
     * @param key 键
     * @param time 时间(秒)
     */
    public boolean expire(@NonNull String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            log.error("exception when expire key {}. ", key, e);
            return false;
        }
    }

    /**
     * 根据key获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) -1 代表为永久有效 -2 代表已失效
     */
    public long getExpire(@NonNull String key) {
        Long expire = redisTemplate.getExpire(key, TimeUnit.SECONDS);
        Optional<Long> optional = Optional.ofNullable(expire);
        return optional.orElse(-2L);
    }

    /**
     * 判断key是否存在
     *
     * @param key  键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        return Boolean.TRUE.equals(redisTemplate.hasKey(key));
    }

    /**
     * 删除缓存
     *
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }

    /**
     * 普通缓存获取
     *
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     *
     * @param key 键
     * @param value 值
     * @return true成功 false失败
     */
    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            log.error("exception when set key {}. ", key, e);
            return false;
        }

    }

    /**
     * 普通缓存放入并设置时间
     *
     * @param key 键
     * @param value 值
     * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            log.error("exception when set key {}. ", key, e);
            return false;
        }
    }

    /**
     * 递增
     *
     * @param key 键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta <= 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     *
     * @param key 键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta <= 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    /**
     * HashGet
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return 值
     */
    public Object hGet(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     *
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hMGet(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     *
     * @param key 键
     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public boolean hMSet(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            log.error("exception when hash set key {}. ", key, e);
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     *
     * @param key 键
     * @param map 对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hMSet(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            log.error("exception when hash set key {}. ", key, e);
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key 键
     * @param item 项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hSet(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            log.error("exception when hash set key {}, item {} ", key, item, e);
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key 键
     * @param item 项
     * @param value 值
     * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hSet(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            log.error("exception when hash set key {}, item {} ", key, item, e);
            return false;
        }
    }

    /**
     * 删除hash表中的值
     *
     * @param key 键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hDel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判断hash表中是否有该项的值
     *
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key 键
     * @param item 项
     * @param by 要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     *
     * @param key 键
     * @param item 项
     * @param by 要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }

    /**
     * 根据key获取Set中的所有值
     *
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key  键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     *
     * @param key  键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     *
     * @param key   键
     * @param time  时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     *
     * @param key   键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 移除值为value的
     *
     * @param key   键
     * @param values   值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().remove(key, values);
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 获取list缓存的内容
     *
     * @param key  键
     * @param start   开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     *
     * @param key  键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     *
     * @param key  键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value  值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value  值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key  键
     * @param value  值
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key  键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key  键
     * @param index 索引
     * @param value 值
     */
    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key 键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public long lRemove(String key, long count, Object value) {
        try {
            return redisTemplate.opsForList().remove(key, count, value);
        } catch (Exception e) {
            return 0;
        }
    }
}

测试:

@GetMapping("/set")
public Object test2(){
    return redisTemplateService.set("user:"+ UUID.fastUUID().toString(),"10001");
}

总结:

  • 集群好处:实现扩容、分摊服务压力、无中心化配置简单。
  • 集群缺点:不支持多键操作、不支持事务和lua脚本。

9、应用问题

缓存穿透:是用户查询一条在redis中不存在的数据,这是就会向数据库中去查询,然而也没有查询到就会一直请求一直请求导致数据库服务压力变大。缓存穿透一般是遭受黑客攻击,黑客会一直去查询一条不存在的数据,导致我们的服务不可用。

解决方案:

  • 对空值进行缓存:对于返回空值的数据也进行缓存,将空值的过期时间设置很短。
  • 设置课访问的白名单:使用bitmap类型定义一个白名单,名单id作为bitmap的偏移量,先查询是否在bitmap中,如果在就将请求放行,如果不在就将请求进行拦截。
  • 布隆过滤器:它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。

缓存击穿:一个经常被请求的key,在缓存时间过期时,后端数据库没有及时将缓存更新,这时突然大量的请求去请求这个key,导致在redis中无法获取到数据,会直接去请求数据库,数据库压力增大性能下降。

解决方案:

  • 使用锁:在缓存失效时(取出来的值为空),不要立即去数据库中查询。

image-20220826213231178

缓存雪崩:redis中的大量key集中过期或者redis宕机,导致大量请求直接从数据库中请求数据,数据库服务压力增大。

解决方案:

  • 给key设置过期时间设置成间隔过期。
  • 构建多级缓存结构。
  • 搭建redis集群,保证redis的高可用。
  • 在请求层面,对缓存业务添加限流和服务降级。
  • 使用锁和队列,保证没有大量的线程对数据库进行读写,同时避免大量请求落在底层的存储系统上。

10、分布式锁

​ 随着业务的发展,从单一的服务架构演变成分布式的服务架构。由于分布式的多线程、多进程分布在不同的服务器上,这使单机下的锁策略失效。为了解决这个问题就需要一个夸JVM的互斥锁来控制资源的访问。

  • 基于数据库实现分布式锁。
  • 基于缓存实现分布式锁。
  • 基于Zookeeper实现分布式锁。

基于Redis缓存实现分布式锁:

# 分布式锁基于下面的命令
setnx key_lock value # setnx增加key时,如果key存在就不会添加成功,不存在才能添加成功,key_lock是锁的名称

image-20220826222812262

# 在这个过程中可能存在业务逻辑中一直没有把锁删除,所以需要给锁加一个过期时间
set key_lock value nx ex time # nx 表示不能重复设置,ex设置过期时间time,单位秒

Java实现分布式锁:

@GetMapping("/lock")
public String lock(){
    // 获取锁
    boolean lock = redisTemplateService.setNx("lock", "1", 10);

    // 如果是true就获取了锁
    if (lock){
        // 键num+1,
        redisTemplateService.incr("num",1);
        // 删除缓存中的锁
        redisTemplateService.del("lock");
        System.out.println("增加成功");
        return "增加成功";
    }else {
        // 没有获取到锁,等待并重新请求锁
        this.lock();
    }
    return "增加成功";
}

UUID防止lock误删:在获取到锁以后,但是在业务逻辑代码中如果处理的时间超过了lock设置的过期时间,那么lock就会自动过期,其他请求就会重新设置锁,并进入请求。当之前的业务逻辑代码处理完后就会去删除lock,就会把其他请求的lock删除,导致另外的请求也会进入,这样就可以使用UUID作为key值,再删除之前判断是否是自己的key,如果是自己的key就删除不是就不删除。

@GetMapping("/lock")
public String lock(){
    String uuid = UUID.randomUUID().toString();
    // 获取锁
    boolean lock = redisTemplateService.setNx("lock", uuid, 10);

    // 如果是true就获取了锁
    if (lock){
        // 键num+1,
        redisTemplateService.incr("num",1);
        String value = (String) redisTemplateService.get("lock");
        if (uuid.equals(value)){
            // 删除缓存中的锁
            redisTemplateService.del("lock");
        }
        return "增加成功";
    }else {
        // 没有获取到锁,等待并重新请求锁
        this.lock();
    }
    return "增加成功";
}

版权声明:本文为L__HH原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。