分布式锁Redision

目录

1.ab工具(压测工具)的安装

2.前置

 3.优化

3.1synchronized修饰代码方法/代码块

3.2分布式锁事务的解决方案

3.3Redis实现锁问题

3.3.1 set ex方式

3.3.2 set ex方式+设置过期时间

3.3.3单redis结点的解决UUID和LUA脚本

3.3.4redission解决分布式锁

4.Redission解决分布式锁

4.1添加依赖

4.2相关配置类

4.3注解实现分布式锁


前置补充:

1.并发和事务区别:

并发的理解:Java 并发问题、产生的原因及解决方法 - 掘金 (juejin.cn)

在系统接受请求,先做并发处理,再事务处理。

每个人对资源的获取都相当于在一线程中,如果大量请求同时发生会导致磁盘资源的过度抢占,做不了别的事而导致宕机或变慢。然后在数据库的多表操作要考虑事务。

补充:修改mysql默认隔离级别

SELECT @@transaction_isolation; #8.0查看数据库事务
SET SESSION TRANSACTION ISOLATION LEVEL <isolation_level>; #设置事务隔离级别
isolation_level:
#读取未提交 
   READ UNCOMMITTED         
#允许读取已提交的数据  
   READ COMMITTED           
#可重复读        8.0默认数据库事务
   REPEATABLE READ   
#可串行化 最严格的
   SERIALIZABLE

2.事务的锁和并发的锁区别:

事务的锁,在事务内部进行,保障事务的原子性、一致性、隔离性、持久性。当事务提交或回滚就会释放。

并发的锁:防止cpu切换时候指令重排,保障多个并发操作同时进行数据的一致性完整性。并发加的锁在整个变更发操作期间都有效,直到手动释放或添加事务结束。

1.ab工具(压测工具)的安装

yum install -y httpd-tools

语法:  ab        n(一次发送的请求数量)        -c(请求的并发数)        访问路径

例子:5000个请求,100的并发 ps:注意关闭本地windows防火墙

ab  -n 5000 -c 100 http://192.168.200.1:8206/admin/product/test/testLock

set key value nx|xx ex|px

nx:表示不存在则进行操作

xx:存在再进行操作

ex:表示过期时间,秒

px:过期时间 毫秒

2.前置

提前再redis设置num=0;

ab  -n 5000 -c 100 http://192.168.34.93:8206/admin/product/test/testLock

 代码片段:

 3.优化

3.1synchronized修饰代码方法/代码块

此方法只有在单个服务的时候有效,如果分布式事务时,会导致失效

3.2分布式锁事务的解决方案

分布式锁的关键是:多线程共享内存标记(锁)

三种:

        1.基于数据库的分布式锁

        2.基于缓存(redis等)        性能最高

        3.基于zookeeper实现        最可靠

补充:数据库实现分布式锁过程(从性能上不考虑)

3.3Redis实现锁问题

        使用需注意点:

1.多线程可见: 多线程可见

2.死锁的情况:要保障锁的释放

3.排他:同一时刻,只能由一个进程获取锁

4.高可用:避免服务宕机(redis集群搭建:1.主从复制 2.哨兵3.cluster集群)

3.3.1 set ex方式

 通过 set ex key value 如果该字段不存在,判断为真则进行业务代码,如果存在则再次获取,直到获取锁为止。但是这种情况容易出现死锁的问题,如果发生异常,流程被打断了,就会发生死锁问题

    public void testLock() {
        //0.先尝试获取锁 setnx key val
        Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "lock");
        if(flag){
            //获取锁成功,执行业务代码
            //1.先从redis中通过key num获取值  key提前手动设置 num 初始值:0
            String value = redisTemplate.opsForValue().get("num");
            //2.如果值为空则非法直接返回即可
            if (StringUtils.isBlank(value)) {
                return;
            }
            //3.对num值进行自增加一
            int num = Integer.parseInt(value);
            redisTemplate.opsForValue().set("num", String.valueOf(++num));

            //4.将锁释放
            redisTemplate.delete("lock");

        }else{
            try {
                Thread.sleep(100);
                this.testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

3.3.2 set ex方式+设置过期时间

设置key的过期时间有两种方式

1.expire key timeout         此方法设置会在异常之后,永远不执行,

2.setex key timeout value ;开始就对lock设置过期时间                ps:set lock value ex 10 nx

情况:

1.设置锁的过期时间为3秒,但业务的执行实现为7秒。当锁的时间过期剩下4秒则有新的线程执行。

2.当业务执行完了,开始释放锁,但在他的前一秒刚有一个线程进入池子中,加锁进行操作。此时锁也被删掉了。没有拦住。

3.3.3单redis结点的解决UUID和LUA脚本

1.优化UUID防止误删:

  public void testLock() {
        //设置uuid
        String uuid = UUID.randomUUID().toString().replace("-","");
        Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
        if (flag) {
            String value = this.redisTemplate.opsForValue().get("num");
            if (StringUtils.isEmpty(value)) {
                return;
            }
            int num = Integer.parseInt(value);
            this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
            while (uuid.equals(this.redisTemplate.opsForValue().get("lock"))) {
                redisTemplate.delete("lock");
            }
        } else {
            try {
                Thread.sleep(10);
                this.testLock();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

使用uuid防止误删锁后还有问题:

        1.还是会发生多个线程获取到资源(时间过期释放锁),实现锁的续期。

        守护线程---->expire key timeout; set key value ex timeout nx;

 代码:

            Thread thread = new Thread(() -> {
                this.redisTemplate.expire("lock", 3, TimeUnit.SECONDS);
            });
            thread.setDaemon(true);
            thread.start();

 2.优化LUA脚本保证删除的原子性

SET — Redis 命令参考

    public void testLock() {
        String uuid = UUID.randomUUID().toString().replace("-", "");
        Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
        if (flag) {
            String value = this.redisTemplate.opsForValue().get("num");
            if (StringUtils.isEmpty(value)) {
                return;
            }
            int num = Integer.parseInt(value);
            this.redisTemplate.opsForValue().set("num", String.valueOf(++num));
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
            String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
                    "then\n" +
                    "    return redis.call(\"del\",KEYS[1])\n" +
                    "else\n" +
                    "    return 0\n" +
                    "end";

            redisScript.setScriptText(script);
            //设置响应类型
            redisScript.setResultType(Long.class);
            redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
        } else {
            try {
                Thread.sleep(10);
                this.testLock();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

3.3.4redission解决分布式锁

以上方法适合redis的单结点适合解决,但如果redis搭建集群则不能解决

为什么?

        redis集群中,分片和结点之间的复制,使用lua脚本无法确保原子性操作,当使用Lua脚本时候,他将在一个结点上执行,而数据可能被分配在多个结点,在执行脚本的期间,节点通信不一致。所以lua脚本无法在集群中锁住资源

解决方案:

        redisson-redLock:大部分结点加锁成功,我就判断加锁成功[过半机制]

为了满足分布式锁可用:

1.互斥性:任意时刻,只能由一个客户持有锁

2.不会发生死锁,即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保障其他用户加锁

3.加锁和解锁必须同一个用户

4.加锁和解锁必须有原子性

4.Redission解决分布式锁

Redisson是一个在Redis的基础上实现的Java驻内存数据网格,提供多种数据类型,促进使用者对Redis的关注分离。

官网地址:Home · redisson/redisson Wiki · GitHub

github:GitHub - redisson/redisson: Redisson - Easy Redis Java client with features of In-Memory Data Grid. Over 50 Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Publish / Subscribe, Bloom filter, Spring Cache, Tomcat, Scheduler, JCache API, Hibernate, MyBatis, RPC, local cache ...

4.1添加依赖

<!-- Redisson -->
<dependency>
   <groupId>org.Redisson</groupId>
   <artifactId>Redisson</artifactId>
   <version>3.15.3</version>
</dependency>

4.2相关配置类

@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {

    private String host;

    private String addresses;

    private String password;

    private String port;

    private int timeout = 3000;
    private int connectionPoolSize = 64;
    private int connectionMinimumIdleSize = 10;
    private int pingConnectionInterval = 60000;
    private static String ADDRESS_PREFIX = "redis://";

    /**
     * 自动装配
     */
    @Bean
    RedissonClient redissonSingle() {
        Config config = new Config();
        if (StringUtils.isEmpty(host)) {
            throw new RuntimeException("host is  empty");
        }
        SingleServerConfig serverConfig = config.useSingleServer()
                //redis://127.0.0.1:7181
                .setAddress(ADDRESS_PREFIX + this.host + ":" + port)
                .setTimeout(this.timeout)
                .setPingConnectionInterval(pingConnectionInterval)
                .setConnectionPoolSize(this.connectionPoolSize)
                .setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
        if (!StringUtils.isEmpty(this.password)) {
            serverConfig.setPassword(this.password);
        }
        config.useClusterServers().addNodeAddress().addNodeAddress();
        /*
        集群版:
             config.useClusterServers().addNodeAddress().addNodeAddress();*/
        // RedissonClient redisson = Redisson.create(config);
        return Redisson.create(config);
    }
}

4.3注解实现分布式锁

Core Technologies

添加注解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface GmallCache {
    /*
     * 缓存数据前缀
     * */
    String prefex() default "cache:";
    /*
    缓存数据后缀
     */
    String suffix() default ":info";
}

使用aop的方式环向切入

@Component
@Aspect
public class GmallCacheAspect {
    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    /**
     * @param proceedingJoinPoint 能够获取请求之前的参数,请求的方法体,返回值等信息
     * @return
     */

    @Around("@annotation(com.atguigu.gmall.cache.GmallCache)")
    @SneakyThrows
    public Object cacheAspect(ProceedingJoinPoint proceedingJoinPoint) {
//        声明一个对象
        Object obj = new Object();
        /*
         * 1.实现分布式锁的逻辑
         * 获取到缓存的key;注解前缀+参数+注解后缀
         * 获取到方法签名
         * */
        //获取签名
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
        GmallCache annotation = signature.getMethod().getAnnotation(GmallCache.class);
        //获取参数
        Object[] args = proceedingJoinPoint.getArgs();
        //获取前缀
        String prefex = annotation.prefex();
        //获取后缀
        String suffix = annotation.suffix();
//        组成缓存的key
        String skuKey = prefex + Arrays.asList(args) + suffix;
        try {
            obj = this.redisTemplate.opsForValue().get(skuKey);
            if (obj == null) {
//                查询数据库,加一把锁
                String lockKey = prefex + ":lock";
                RLock lock = this.redissonClient.getLock(lockKey);
                lock.lock();
                try {
//                    查询数据库
                    obj = proceedingJoinPoint.proceed(args);
                    if (obj == null) {
                        Object o = new Object();
                        //                    放入缓存
                        this.redisTemplate.opsForValue().set(skuKey, o, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
                        return o;
                    }
                    this.redisTemplate.opsForValue().set(skuKey, obj, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
                    return obj;
                } catch (Throwable e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            } else {
                return obj;
            }
        } catch (RuntimeException e) {
            throw new RuntimeException(e);
        }
        return proceedingJoinPoint.proceed(args);
    }
}


版权声明:本文为weixin_46549023原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。