秒杀案例测试
1、不加锁测试
/**
* 秒杀案例测试
*/
@GetMapping("index/testlock")
public void testLock() {
//-----------业务方法
//获取redis中num的值
int num =Integer.parseInt(
redisTemplate.opsForValue().get("num").toString()
);
num++;
//设置到redis中
redisTemplate.opsForValue().set("num",num);
}
浏览器访问测试(先配置RedisCacheConfig不然会有些莫名其妙的问题) http://www.gmall.com/index/testlock
在redis数据库可以看到
====================
安装ab
yum install -y httpd-tools
虚拟机配置hosts映射:
vim /etc/hosts
添加 : 127.0.0.1 www.gmall.com
使用ab并发访问测试:
ab -n(一次发送的请求数) -c(请求的并发数) 访问路径 ab -n 5000 -c 100 http://www.gmall.com/index/testlock
在redis数据库可以看到,理论上应该是5001才对。。。差的有点多
为什么差这么多呢
tomcat会给每一个请求使用一个线程处理,线程1执行途中,cpu时间片用完了,,,就凉
2、使用本地锁,加synchronized
@GetMapping("index/testlock")
public synchronized void testLock() {
//-----------业务方法
//获取redis中num的值
int num =Integer.parseInt(
redisTemplate.opsForValue().get("num").toString()
);
num++;
//设置到redis中
redisTemplate.opsForValue().set("num",num);
}
重新使用ab测试
ab -n(一次发送的请求数) -c(请求的并发数) 访问路径 ab -n 5000 -c 100 http://www.gmall.com/index/testlock
redis数据库num变为5761,没毛病synchronized保证了单实例的线程并发安全问题。
能解决的原因是
单应用并发安全问题解决,使用jvm锁解决
3、本地锁测试多实例情况
我们把num的值改回0, 启动多实例,重新测试,服务以集群方式启动:
访问秒杀接口时,请求最终交给网关项目,网关可以负载均衡将请求轮询交给不同的实例处理
在弹出的配置界面中,
启动后,在nacos控制台中可以看到如下效果:
使用ab重新测试
ab -n(一次发送的请求数) -c(请求的并发数) 访问路径 ab -n 5000 -c 100 http://www.gmall.com/index/testlock
这时候咱们加的synchronized本地锁就不起作用了
每一个应用实例是线程安全的,但是多个应用实例之间又构成了并发。。jvm锁解决不了
解决办法:扩大竞争资源(如state值)的范围,使每一个应用实例都可以访问操作的到
使用redis实现分布式锁
1、分布式锁雏形
分布式锁主流的实现方案:
- 基于数据库(mysql的悲观锁)实现分布式锁
- 基于缓存(Redis等)
- 基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
性能:redis最高
可靠性:zookeeper最高
=======================================================
使用redis实现分布式锁的大概思路
1.每个线程执行前都setnv k v访问redis 成功的获取到锁,
2.不成功的休眠短时间后,再次setnv kv直到成功获取到锁
3.使用锁的线程执行结束时一定要删除kv
/**
* 秒杀案例测试
*/
@GetMapping("index/testlock")
public synchronized void testLock() {
String uuid = UUID.randomUUID().toString();
//设置lock锁成功,则说明获取锁成功,可以执行业务方法。这里加过期时间是为了防止死锁问题
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 2l, TimeUnit.SECONDS);
if (ifAbsent) {
//------------------获取锁成功的线程
****执行业务方法
//获取redis中num的值
int num =Integer.parseInt(
redisTemplate.opsForValue().get("num").toString()
);
num++;
//设置到redis中
redisTemplate.opsForValue().set("num",num);
***业务执行完毕
// 释放锁,即删除key="lock' 的键值对
redisTemplate.delete("lock");
}else{
//-----------------获取锁失败的线程
//每隔50ms去重新获取琐
try {
//设置这个时间是为了方式栈溢出
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//递归调用,重新获取琐
testLock();
}
}
上述代码顺带解决了2个问题。。
问题1:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放
解决:设置过期时间,自动释放锁。
问题2:误删除,可能会释放其他服务器的锁。
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
但是其实还有不少问题,比如:
- 问题3:删除锁的原子性问题
- 问题4:可重入问题,
- 问题5:锁自动续期问题
2、LUA脚本
redis内置了LUA脚本解释器,redis客户端中通过EVAL可以执行LUA脚本。
eval script numkeys key [key ...] arg [arg ...]
首先登录redis客户端。。。
1.helloworld和字符串拼接
eval "return 'hello'..'world!'" 0
return 返回执行结果 ,最后的0代表没有参数,看的出来使用..来拼接字符串
======
2.获取参数
eval "return KEYS[1]..','..KEYS[2]..','..ARGV[1]" 2 K1 K2 ARG1
参数的使用 2代表传入的keys(通过KEYS[i]使用)有两个,其余的参数通过ARGV[i]获取
=====
3.定义变量
eval "local a=KEYS[1] return a" 1 zezeze
局部变量的使用 local用来定义局部变量
=====
4.判断
eval "local age=tonumber(KEYS[1]) if age>=18 then return 1 else return 0 end " 1 19
条件判断 语句结束需要使用end, ~=不等于 ==等于 要加if...then...end
=====
5.执行redis命令
eval "return redis.call('set',KEYS[1],KEYS[2])" 2 X Y
LUA脚本执行redis命令
3、SpringBoot整合 LUA脚本
首先在redis客户端执行一遍,保证咱这个命令的正确性
eval "if redis.call('hexists',KEYS[1],KEYS[2])==0 then
redis.call('hset',KEYS[1],KEYS[2],ARGV[1]) return 1 else return 0 end" 2 user:1 user:1
zhangsan
接着在java代码里去写,一定要注意空格。。
String script = "if redis.call('hexists' ,KEYS[1] , KEYS[2])==0 " +
" then " + //==0说明不存在,可以设置
" redis.call('hset',KEYS[1],KEYS[2],ARGV[1]) return 1 " +
" else " + //说明已经存在,设置失败返回0
" return 0 end";
System.out.println(script);
//通过execute可以执行LUA脚本,参数1:脚本字符串,参数2:脚本返回值类型,参数3:keys列表,参数4:argv列表
redisTemplate.execute(new DefaultRedisScript<>(script , Boolean.class), Arrays.asList("user:1","user:2"),"lisi");
在Redis数据库中可以看得到,数据已经保存成功了
4、LUA脚本解决原子性问题
如下代码存在原子性问题:
// 释放锁,即删除key="lock' 的键值对,uuid保证只能释放自己的锁 if(redisTemplate.opsForValue().get("lock")!=null && StringUtils.equals(redisTemplate.opsForValue().get("lock").toString(),uuid)) redisTemplate.delete("lock");
如果线程1 获取到锁,执行到删除锁的代码时(获取了redis缓存的锁的val),锁过期了
cpu时间片切换给线程2,线程2此时可以获取到锁,cpu时间片切换给线程1,线程1此时删除锁时 使用的是key进行的删除,将2的锁删除了
使用LUA脚本解决原子性问题:
1、判断有没有lock锁
如果没有 不删除:锁过期且没有其他线程使用锁
2、如果有
获取lock的value和uuid进行比较
如果不一样 不删除:因为不是自己的锁
如果一样, 删除自己的锁
String script = "if redis.call('exists' , KEYS[1])==0 " + " then return 0 " +//==0表示不存在,锁空闲,不删除锁 " elseif redis.call('get' , KEYS[1])== ARGV[1] " +//有锁且是自己的锁(是不是自己的通过uuid) " then return redis.call('del' , KEYS[1]) " + //删除自己的锁 " else return 0 end";//锁不是自己的,不删除 redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList("lock"), uuid);
注意的点,这时候我们相当于是setnx lock uuid,使用的是字符串,下面开始会转为hash
5、保证分布式锁的可重入性
如果一个线程获取到分布式锁成功后调用了另一个业务方法也需要使用分布式锁setnv k v就出现死锁。我们希望一个线程获取到分布式锁以后 执行其他的需要该分布式锁的业务方法可以直接执行
ReetrantLock锁重入原理
1、线程获取锁 如果state值为0,通过原子性的CAS将state设置为1代表获取锁成功
2、线程调用其他的方法也需要加锁时,判断锁如果被使用 而且使用锁的线程对象就是当前的线程对象,对state的值+1
3、释放锁时,如果锁是自己使用的,state值-1后 如果state的值为0 代表锁完全释放
否则释放失败
结论:可重入锁最大特性就是计数,计算加锁的次数
实现可重入
Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑
获取锁的时候
如果锁不存在,初始化锁 代表获取锁成功,设置锁使用的次数为1
hset lock uuid 1
如果锁存在,判断是不是自己的锁
使用redis中lock的值 和uuid进行比较
如果一样:代表是自己的锁,还需要设置获取锁的次数(释放锁使用)
如果不一样:不是自己的锁 获取锁失败public Boolean tryLock(String lock , String uuid , Long expireTime) { //获取锁逻辑 String script = "if redis.call('exists' , KEYS[1])==0 " + " then " +//==0表示锁不存在 " redis.call('hset' , KEYS[1] , KEYS[2] , 1) " + //直接获取锁,且设置重入次数count=1 " redis.call('expire' , KEYS[1] ,ARGV[1]) " + //设置锁的有效期方式 " return 1 " + " elseif redis.call('hexists' , KEYS[1] , KEYS[2])==1 " + " then " +//锁存在,而且是自己的(是不是自己的用uuid判断) " redis.call('hincrby' , KEYS[1] , KEYS[2] , 1) return 2 " + " else return 0 end ";//锁不是自己的,获取锁失败 Long flag = (Long) redisTemplate.execute(new DefaultRedisScript(script,Long.class), Arrays.asList(lock , uuid),expireTime); //如果第一次获取锁成功:创建看门狗任务为锁自动续期 if(flag==1){ renewKey(expireTime,lock,uuid); } return flag!=0; }
---------------------------------------------------------------------------------------
释放锁时,先判断是不是自己的锁
如果是自己的锁
再判断次数是否为1,是1 删除锁 释放成功
次数不为1,基于之前的次数-1 释放未成功
如果不是自己的锁
释放失败public void releaseLock(String lock , String uuid) { //释放锁 String script = "if redis.call('hexists' ,KEYS[1] , KEYS[2])==0 " + " then " +//==0说明不是自己的锁,释放锁失败 " return 0 " + " elseif tonumber(redis.call('hget' , KEYS[1] , KEYS[2])) == 1 " + //获取锁使用的次数 判断是否是1次 " then " +//如果可重入次数是1次,就直接释放锁,删除key " redis.call('del' ,KEYS[1]) return 1 " + " else " + //如果不是1次,重入锁重入的次数-1 " redis.call('hincrby' , KEYS[1] , KEYS[2], -1) return 2 end"; System.out.println(script); redisTemplate.execute(new DefaultRedisScript(script,Long.class) , Arrays.asList(lock,uuid)); }
如果返回值使用 Boolean,Spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断,所以返回类型只好使用 Long。
6、锁的自动续期(看门狗)
A线程超时时间设为10s(为了解决死锁问题),但代码执行时间可能需要30s,然后redis服务端10s后将锁删除。 此时,B线程恰好申请锁,redis服务端不存在该锁,可以申请,也执行了代码。那么问题来了, A、B线程都同时获取到锁并执行业务逻辑,
这与分布式锁最基本的性质相违背:在任意一个时刻,只有一个客户端持有锁(即独享排他)。
自动续期思路
可以在锁即将过期时,对锁进行续期
每过 过期时间的2/3,对锁进行续期(重新设置过期时间)
看门狗需要额外的一个线程,锁延期方法:开启子线程执行延期
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
//看门狗:自动续期方法
public void renewKey(Long expireTime,String lock , String uuid) {//expireTime 是lock锁的过期时间 单位秒
pool.schedule(()->{
//给分布式锁的键设置过期时间: 给调用此方法的任务的锁续期
//如果 key 和uuid的 锁一直存在 一直续期 ,如果不存在 任务停止
String script = "if redis.call('hexists' , KEYS[1] , KEYS[2])==1 " +
" then redis.call('expire' ,KEYS[1] ,ARGV[1]) return 1 " + //续期成功
" else return 0 end"; //续期失败
while((Boolean)redisTemplate.execute(new DefaultRedisScript(script,Boolean.class) ,
Arrays.asList(lock,uuid) , expireTime)){
try {
// 到达过期时间的2/3时间,自动续期
Thread.sleep(expireTime*2000/3);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
},expireTime*2000/3 ,TimeUnit.MILLISECONDS);//初始化时 任务只会执行一次
}
整合完整代码与总结
1、整合的完整代码
/**
* 秒杀案例测试
*/
@GetMapping("index/testlock")
public synchronized void testLock() {
String uuid = UUID.randomUUID().toString();
Boolean flag = tryLock("lock" , uuid , 30L);
if (flag) {
//------------------获取锁成功的线程
//****执行业务方法
//测试锁的可重入性
check("lock" ,uuid);
//获取redis中num的值
int num = Integer.parseInt(
redisTemplate.opsForValue().get("num").toString()
);
num++;
//设置到redis中
redisTemplate.opsForValue().set("num", num);
//****业务执行完毕
releaseLock("lock",uuid);
} else {
//-----------------获取锁失败的线程
//每隔50ms去重新获取琐
try {
//设置这个时间是为了方式栈溢出
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//递归调用,重新获取琐
testLock();
}
}
public void check(String lockKey, String uuid) {
Boolean lock = tryLock(lockKey, uuid, 30L);
if (lock) {
System.out.println("环境检查.....");
releaseLock(lockKey, uuid);
} else {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
check(lockKey, uuid);
}
}
public Boolean tryLock(String lock, String uuid, Long expireTime) {
//获取锁逻辑
String script = "if redis.call('exists' , KEYS[1])==0 " +
" then " +//==0表示锁不存在
" redis.call('hset' , KEYS[1] , KEYS[2] , 1) " + //直接获取锁,且设置重入次数count=1
" redis.call('expire' , KEYS[1] ,ARGV[1]) " + //设置锁的有效期方式
" return 1 " +
" elseif redis.call('hexists' , KEYS[1] , KEYS[2])==1 " +
" then " +//锁存在,而且是自己的(是不是自己的用uuid判断)
" redis.call('hincrby' , KEYS[1] , KEYS[2] , 1) return 2 " +
" else return 0 end ";//锁不是自己的,获取锁失败
Long flag = (Long) redisTemplate.execute(new DefaultRedisScript(script, Long.class),
Arrays.asList(lock, uuid), expireTime);
//如果第一次获取锁成功:创建看门狗任务为锁自动续期
if (flag == 1) {
renewKey(expireTime, lock, uuid);
}
return flag != 0;
}
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
//看门狗:自动续期方法
public void renewKey(Long expireTime, String lock, String uuid) {//expireTime 是lock锁的过期时间 单位秒
pool.schedule(() -> {
//给分布式锁的键设置过期时间: 给调用此方法的任务的锁续期
//如果 key 和uuid的 锁一直存在 一直续期 ,如果不存在 任务停止
String script = "if redis.call('hexists' , KEYS[1] , KEYS[2])==1 " +
" then redis.call('expire' ,KEYS[1] ,ARGV[1]) return 1 " + //续期成功
" else return 0 end"; //续期失败
while ((Boolean) redisTemplate.execute(new DefaultRedisScript(script, Boolean.class),
Arrays.asList(lock, uuid), expireTime)) {
try {
Thread.sleep(expireTime * 2000 / 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
;
}, expireTime * 2000 / 3, TimeUnit.MILLISECONDS);//初始化时 任务只会执行一次
}
public void releaseLock(String lock, String uuid) {
//释放锁
String script = "if redis.call('hexists' ,KEYS[1] , KEYS[2])==0 " +
" then " +//==0说明不是自己的锁,释放锁失败
" return 0 " +
" elseif tonumber(redis.call('hget' , KEYS[1] , KEYS[2])) == 1 " + //获取锁使用的次数 判断是否是1次
" then " +//如果可重入次数是1次,就直接释放锁,删除key
" redis.call('del' ,KEYS[1]) return 1 " +
" else " + //如果不是1次,重入锁重入的次数-1
" redis.call('hincrby' , KEYS[1] , KEYS[2], -1) return 2 end";
System.out.println(script);
redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lock, uuid));
}
使用ab测试
ab -n 5000 -c 100 http://www.gmall.com/index/testlock
测试结果:没毛病老铁
2、redis实现分布式锁总结
1、redis实现分布式锁基于redis的单线程线程安全
setnv k v :多线程并发执行该命名时,相同的key只有一个线程可以操作成功 代表获取到锁
删除key时代表释放锁
2、分布式锁key需要设置过期时间 解决死锁问题(程序宕机崩溃异常导致的锁未能正常删除释放)
3、分布式锁key的值每个线程使用自己生成uuid作为值: 过期时间 业务时长超过过期时间时 可能导致误删除
4、LUA脚本将多个redis的操作+逻辑判断写成一个脚本交给redis一次性执行: 命令不会被插队 删除锁的时候 需要判断是不是自己的锁 ,java代码中分为多步执行 会有原子性问题
5、LUA脚本为锁统计当前线程使用的次数保证重入获取锁和释放锁: 方法之间的调用 如果都需要使用分布式锁,也会导致死锁(一个线程已经获取到锁了 调用另一个方法时 又要获取相同的锁)
使用了hash结构: hset lock uuid count
6、看门狗机制:通过额外的线程在过期时间的2/3时重新设置键的过期时间 分布式锁业务执行时间长可能锁会过期,可以为锁自动续期