Redis实现分布式锁
- Redis做分布式锁的时候需要注意的问题?(原子性,互斥)
- 如果是Redis是单点部署的,会带来什么问题?那你怎么解决?
- 集群模式下。比如主从模式,有没有什么问题?
- 简单介绍一下RedLock
- Redis分布式锁如何续期?看门狗知道吗?
使用场景
多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)
搭建
1、建Module
我们搭建一个简单的购买商品的案例。商品保存到redis,模拟两个业务
去购买商品,两个业务相同,只是端口不一样。搭建springboot项目
boot_redis01
pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-aop -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
yml
server:
port: 1111
spring:
redis:
database: 0
host: localhost
port: 6379
lettuce:
pool:
max-active: 8 # 连接池最大连接数 默认8
max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认-1
max-idle: 8 # 连接池中最大空闲连接 默认8
min-idle: 0 # 连接池最小的空闲连接 默认0
主启动类BootRedis01Application
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class BootRedis01Application {
public static void main(String[] args) {
SpringApplication.run(BootRedis01Application.class,args);
}
}
RedisConfig
@Configuration//相当于spring中applicationContext.xml
public class RedisConfig {
@Bean//相当于<bean id="" class="">交给spring管理
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
}
GoodController
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
String result = redisTemplate.opsForValue().get("goods:001");//get key==看看库存的数量够不够
int goodNumber = result == null ? 0 : Integer.parseInt(result);
if (goodNumber > 0) {
int realNumber = goodNumber - 1;
redisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
System.out.println("成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort);
return "成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort;
} else {
System.out.println("商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort);
}
return "商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort;
}
}
搭建工程如下图:
以上搭建完成,我们测试,首先在redis里面存入100个商品:
启动,发送请求http://localhost:1111/buy_goods测试

boot_redis02与boot_redis01相同,只需要改端口2222 http://localhost:2222/buy_goods

以上代码有什么问题??
1、单机版加synchronized/加ReentrantLock
synchronized实现
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods(){
synchronized (this) {//是java关键字层面的,不够灵活,可能会造成线程积压
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0){
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort;
}else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort;
}
}
//我觉得时间太长了,我想放弃等待锁
//给我一个规定的时间内,拿不到我就放弃
//synchronized ()此时就不行了,不见不散;使用lock()更灵活,tryLock,过时不候
}
ReentrantLock实现
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
private final Lock lock = new ReentrantLock();
@GetMapping("/buy_goods")
public String buy_Goods(){
if (lock.tryLock()){//是java类层面的,可以指定等待时间,lock锁更灵活
try {
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if (goodsNumber > 0){
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort;
}
}finally {
lock.unlock();
}
}else {
System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort);
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort;
}
}
在单机环境下,可以使用synchronized或Lock来实现。
但是在分布式系统中,因为竞争的线程可能 不在同一个节点上(同一个jvm中),而synchronized是JVM层面的;
所以需要一个让所有进程都能访问到的锁来实现,比如redis或者zookeeper来构建;
不同进程jvm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程
nginx分布式微服务架构
看部署图:不同的服务,不同的JVM

Nginx配置负载均衡
以下在windows下的nginx进行演示,我想以后使用<www.ms.com>来访问我们的项目
(1)更改在本机的hosts文件
(2)使用nginx实现 反向代理 以及 负载均衡
修改nginx的配置文件,nginx.conf
upstream mynginx{
server 127.0.0.1:1111;
server 127.0.0.1:2222;
}
server {
listen 80;
server_name www.ms.com;
location / {
proxy_pass http://mynginx;
}
}
(3)验证成功,实现反向代理 以及 负载均衡
解释:我们访问www.ms.com,因为我们在hosts配置文件配置了本地域名解析,解析成了127.0.0.1,默认80端口
也即127.0.0.1:80,启动nginx之后,nginx监听到了80端口,捕获请求;nginx配置了反向代理规则,将www.ms.com代理到http://mynginx,然后通过负载均衡访问
(4)使用Jmeter进行测试,线程锁synchronized

很明显,出现超卖现象,
1、setnx
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${server.port}")
private String serverPort;
private static final String REDIS_LOCK = "atguiguLock";
@GetMapping("/buy_goods")
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//不存在,就加锁,返回一个Boolean,true 加锁成功
Boolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value);
if (!flag) {
return "抢锁失败,请再次重试!!";
}
String result = redisTemplate.opsForValue().get("goods:001");//get key==看看库存的数量够不够
int goodNumber = result == null ? 0 : Integer.parseInt(result);
if (goodNumber > 0) {
int realNumber = goodNumber - 1;
redisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
//redisTemplate.delete(REDIS_LOCK);//解锁,不能再这里解锁,需要在finally代码块解锁
System.out.println("成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort);
return "成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort;
} else {
System.out.println("商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort);
}
return "商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort;
} finally {
redisTemplate.delete(REDIS_LOCK);//解锁
}
}
}

部署了微服务jar包的机器挂了,代码层面根本没有走到finally这块, 没办法保证解锁,这个key没有被删除,需要加入一个过期时间限定key
`解决:需要对lockKey有过期时间的设定`
2、setnxex
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${server.port}")
private String serverPort;
private static final String REDIS_LOCK = "atguiguLock";
@GetMapping("/buy_goods")
public String buy_Goods() {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//不存在,就加锁,返回一个Boolean,true 加锁成功
Boolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value,10L, TimeUnit.SECONDS);//为了防止死锁,必须加上锁的过期时间
if (!flag) {
return "抢锁失败,请再次重试!!";
}
String result = redisTemplate.opsForValue().get("goods:001");//get key==看看库存的数量够不够
int goodNumber = result == null ? 0 : Integer.parseInt(result);
if (goodNumber > 0) {
int realNumber = goodNumber - 1;
redisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
//redisTemplate.delete(REDIS_LOCK);//解锁
System.out.println("成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort);
return "成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort;
} else {
System.out.println("商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort);
}
return "商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort;
} finally {
redisTemplate.delete(REDIS_LOCK);//解锁
}
}
}

3、删除别人的锁
线程A进来获取了锁,此时到了10秒,锁被释放,但是A的业务没有处理完成;线程B进来,获取到了锁,A的业务处理完成,delete了B的锁
`解决:只能自己删除自己的,不许动别人的`

finally {
if (redisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)) {
redisTemplate.delete(REDIS_LOCK);//解锁
}

4、Lua脚本/redis事务解决删锁问题

Lua脚本进行解决删锁问题(redis官网推荐)
添加jedis的工具类 RedisUtils
public class RedisUtils {
private static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool = new JedisPool(jedisPoolConfig,"localhost",6379,100000);
}
public static Jedis getJedis() throws Exception{
if (null!=jedisPool){
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}
}
使用Lua脚本,修改代码
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${server.port}")
private String serverPort;
private static final String REDIS_LOCK = "atguiguLock";
@GetMapping("/buy_goods")
public String buy_Goods() throws Exception {
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
//不存在,就加锁,返回一个Boolean,true 加锁成功
Boolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, value, 10L, TimeUnit.SECONDS);//为了防止死锁,必须加上锁的过期时间
if (!flag) {
return "抢锁失败,请再次重试!!";
}
String result = redisTemplate.opsForValue().get("goods:001");//get key==看看库存的数量够不够
int goodNumber = result == null ? 0 : Integer.parseInt(result);
if (goodNumber > 0) {
int realNumber = goodNumber - 1;
redisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
//redisTemplate.delete(REDIS_LOCK);//解锁
System.out.println("成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort);
return "成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort;
} else {
System.out.println("商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort);
}
return "商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort;
} finally {
Jedis jedis = RedisUtils.getJedis();
String script = "if redis.call('get', KEYS[1]) == ARGV[1]" + "then "
+ "return redis.call('del', KEYS[1])" + "else " + " return 0 " + "end";
try {
Object o = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));//传入redis的key和value
if ("1".equalsIgnoreCase(o.toString())) {//删除成功就是1
System.out.println("-----del redis lock ok");
} else {
System.out.println("----del redis lock error");
}
} finally {
if (null != jedis) {
jedis.close();//释放jedis
}
}
}
}
}
使用Jmeter测试,此时redis里面存10个商品

发现不会出现超卖,完美解决!!!!
Redis的事务解决删锁问题
首先复习一下redis的事务:
正常情况下
异常情况下

所以 判断和删锁 的代码利用redis事务修改如下:
finally {
while (true) {
redisTemplate.watch(REDIS_LOCK);//开启监控
if (redisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)) {
//先进行事务控制
redisTemplate.setEnableTransactionSupport(true);//开启事务支持
redisTemplate.multi();//开启事务
redisTemplate.delete(REDIS_LOCK);//进行删锁
List<Object> list = redisTemplate.exec();//执行事务
if (list == null) {//删锁不成功,该锁被其他线程动过
continue;//进行下一轮判断
}
redisTemplate.unwatch();//删锁成功,解除监控
break;//跳出死循环
}
}
}

问题?
1、如何确保 锁的过期时间 大于 业务执行时间?
2、Redis分布式锁如何自动续期?
3、Redis分布式锁在集群模式下如何解决?
如果我们加锁加在主机master,redis在分布式是AP模式,满足可用性,我们一加锁成功,立马给出客户端响应,加锁成功;
然后进行同步,同步到slave从机,如果没有同步完成,master宕机,此时别的slave变为主机,由于刚才没有同步完成,异常;
redis异步复制造成的锁丢失, 比如:主节点没来的及把刚刚set进来这条锁数据给从节点,就挂了。
4、zookeeper是CP,它会先保证一致性,通知其他节点同步,然后再给出客户端ok回复; 理论上zookeeper更好,但是高可用下降
此时如果集群模式下,就得上Redisson来解决
Redisson实现分布式锁
1、写Redisson的配置类,在配置类中指定模式
单机模式
//此为单机版redis
@Bean
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://"+redisHost+":6379").setDatabase(0);//redishost是你yml中redis.host,可以使用@value注解注入
return (Redisson) Redisson.create(config);
}
哨兵模式
//此为哨兵模式
Config config = new Config();
config.useSentinelServers().addSentinelAddress(
"redis://172.29.3.245:26378","redis://172.29.3.245:26379", "redis://172.29.3.245:26380")
.setDatabase(0);
集群模式
//此为redis集群
Config config = new Config();
config.useClusterServers().addNodeAddress(
"redis://172.29.3.245:6375","redis://172.29.3.245:6376", "redis://172.29.3.245:6377",
"redis://172.29.3.245:6378","redis://172.29.3.245:6379", "redis://172.29.3.245:6380")
.setPassword("a123456").setScanInterval(5000);
修改controller
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${server.port}")
private String serverPort;
private static final String REDIS_LOCK = "atguiguLock";
@Autowired
private Redisson redisson;
@GetMapping("/buy_goods")
public String buy_Goods() throws Exception {
RLock redissonLock = redisson.getLock(REDIS_LOCK);//只要锁名一样,就是同一把锁
redissonLock.lock();//阻塞式等待(获取不到锁,自旋),不建议加上锁过期时间
// redissonLock.lock(10,TimeUnit.SECONDS);//不建议加上锁的过期时间
//redissonLock.tryLock(10,TimeUnit.SECONDS);//10秒没获取到锁,就不等待了
try {
String result = redisTemplate.opsForValue().get("goods:001");//get key==看看库存的数量够不够
int goodNumber = result == null ? 0 : Integer.parseInt(result);
if (goodNumber > 0) {
int realNumber = goodNumber - 1;
redisTemplate.opsForValue().set("goods:001", String.valueOf(realNumber));
//redisTemplate.delete(REDIS_LOCK);//解锁
System.out.println("成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort);
return "成功买到商品,库存还剩下" + realNumber + "件" + "\t 服务提供端口:" + serverPort;
} else {
System.out.println("商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort);
}
return "商品已经告罄/活动结束/调用超时,欢迎下次光临" + "\t 服务提供端口:" + serverPort;
} finally {
/*
* 假设解锁代码没运行,Redisson会不会出现死锁??
* 不会出现死锁,即使没有手动解锁,它也会帮我们解锁
* 因为Redisson里面有看门狗,会帮我们自动续期
*
* */
if (redissonLock.isLocked()){//有可能 解锁线程 和 持有锁的线程 不是同一个
if (redissonLock.isHeldByCurrentThread()){//为了避免一些麻烦,加上这一步
redissonLock.unlock();//解锁
}
}
}
}
}
```bash
redissonLock.lock(10,TimeUnit.SECONDS);//不建议加上锁的过期时间
原因:
1、加上锁的过期时间之后,业务没有执行完成,到了时间就删锁了,它不会给我们自动续期了
2、不加过期时间,不会像redis分布式锁一样出现死锁。因为底层有看门狗机制,会到30秒自动解锁
可以看出使用RedLock一切都简单了!!!!解决了集群模式(异步复制锁丢失)问题 和 锁的自动续期(看门狗原理)
使用Jmeter测试,此时redis商品100个

看门狗原理
1、如果我们指定了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们制定的时间,不会自动续期;(这样与手写版本的redis分布式锁一样)
2、如果我们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30 * 1000 【看门狗默认时间】
只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10秒都会再次续成30秒;
自动续期时间:internalLockLeaseTime 【看门狗时间 30s】 / 3, 10s

Redisson分布式锁帮我们解决的问题:
1、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30秒;
不用担心业务时间长,锁自动过期被删掉(默认加的锁时间都是30S)
2、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30秒之后自动删除。
Redis分布式锁总结
`Redis实现分布式锁`:
1、需要注意互斥性(setnx),
2、防止死锁和原子性(setnxex),
3、加锁线程和解锁线程是同一个(redisValue.equals(value))
4、第3步的 判断和删锁 保证原子性(Lua或者redis事务+watch监控)
`Redisson实现分布式锁`
1、解决了集群模式下(重写Redisson构造),异步复制锁丢失的问题
2、解决了锁的过期时间小于业务执行时间(看门狗原理)
JUC是本地锁,分布式模式下不可用