Redis分布式锁之锁续期的源码透析!针针见血!

时间为何无法评估?

你是否一定会有这样的疑问:一个同步代码块,逻辑很简单,完全可以预估一个时间。如果预估不准,通过测试取一个平均时间然后稍微做些预留即可,就可以满足我们对锁设置过期时间的评估。

可是对时间的评估并不仅限于简单的业务逻辑,比如像这样一个简单逻辑:判断某条数据是否在数据库中已经存在,如果存在将某个字段更新。逻辑很简单,但是你是否考虑过网络延时导致数据库的链接建立时间较长、连接池中的链接数量不足需要阻塞等待、数据更新的时候需要等待一把较长时间才能释放的写锁等等一系列不可抗拒的因素引发的代码执行时长过大。

当然除了这些,可能代码逻辑本身也可能存在死锁或者远程接口调用超时等。列举的这些操作都有可能会产生代码的执行时间超出我们的预期。

总之,我们不能够对于同步代码块的执行时间过于理想化地评估,毕竟掺杂着网络、存储、锁 等因素,一切的一切并不是那么的靠谱。

顺便提一嘴,是不是可以将锁时间设置长一点,比如一个小时。如果一切进展顺利,不会有问题。可是您是否还记得如果锁释放之前,进程异常终止的这种情况?也就意味着,当进程恢复后必须等到锁过期释放,同步代码才能继续,业务才能继续。所以,合理设置一个过期时间非常重要!

什么是锁续期?

如果您是一个Java开发者,那么你很幸运。基于Java代码实现的Redission框架,它实现concurrent的Lock接口,友好地封装锁的实现,并且对锁过期的问题得到了很好的解决。采用的解决方案是锁续期。

图1:基于Java的Lock接口实现

既然实现了Lock接口,我们耳熟能详的以下接口肯定得到了实现,满足我们需要的各种场景,比如:

  • lock: 阻塞等待直到锁可用
  • tryLock:等待到waitTime 直到锁可用,并返回是否获取到锁
  • isLocked: 检查锁是否被其他线程占用
  • isHeldByThread:检查锁是否被某个线程持有
    ...

顾名思义,锁续期就是,当锁快要过期的时候,给锁延续过期时间,保证同步代码没有执行完,锁并不会因为过期而释放。Redission采用WatchDog的方式实现。WatchDog不是一只狗,而是一个定时执行线程。

为了探究锁续期,我们顺路一起看下Redission的分布式锁是如何实现的。

Redission加锁过程

tryAcquire的确是一个熟悉的方法,试图获取锁,我们可以作为入口跟踪到这段代码:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

通过执行Lua脚本来加锁,核心内容就是这段脚本。所以,理解这段脚本的含义有助于我们理解加锁的过程,即使我们不熟悉Lua脚本的语法,Lua脚本中熟悉的Redis命令就是我们的着手点。

通过Lua脚本执行引擎的代码入口可以得知,KEYS[1]代表Redis的key,ARGV[1]代表过期时间,ARGV[2]是生成的一个唯一标识,就是之前提过的token。

通过Lua脚本提交给Redis,使命令本身具有原子性,所以这里我们把精力集中在每一行的含义:

  1. 使用exits命令判断key是否已经存在
    redis.call('exists', KEYS[1]) == 0
  2. 使用hincrby命令,在对应key下设置了map,而map的key为唯一token,值自增,第一次值为1.
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
  3. 你在2步肯定有这样的疑问,为什么采用map的接口而不是只存储一个单值?相信这一步你就有了答案。Redission需要增加计数器,而实现重入锁。如果你看过Synchronize这个关键字的实现原理。你肯定能明白重入锁的实现了。

因此,下面这语句就是判断是否为同一个线程进入这个逻辑。也许你会你怎么知道是同一个线程?因为我们会给每一个线程设定一个唯一token,而ARGV[2]就是这个唯一标识。

redis.call('hexists', KEYS[1], ARGV[2]) == 1

WatchDog实现锁续期

WatchDog是在线程获取到锁后的一个定时任务线程,该线程进行锁续期,保证同步代码块没有执行完而锁也不会被释放。

顺着tryAcquire继续走,我们会找到成功获取锁后会执行scheduleExpirationRenewal——定时过期续期。

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                //...略
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                //...略
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

为了能够更清晰的看到重点,我省略了相关的代码,定时任务执行线程,每隔一段时间(过期时间/3)执行一次续期。renewExpirationAsync里面的核心又是一段Lua脚本,如下:

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
   redis.call('pexpire', KEYS[1], ARGV[1]); 
   return 1; 
end; 

return 0;

脚本的作用是重新给key设置一个过期时间。那这个看门狗线程不终止,岂不是会有很多看门狗?当然,你肯定想到了,锁释放也就是触发终止看门狗线程的最佳时机。

Redission锁释放过程

挑出了释放锁的核心Lua脚本是这样的:

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
  end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1; 
end; 
return nil;

如果该线程的key已经被释放,那就直接返回。

否则,将重入计数器-1,当然如果计数器没有释放到0,设置过期时间。如果发现该线程已经是最后一个释放锁的,删除对应key,并发布释放锁通知到频道,完成锁释放的过程。

在解锁的代码里,我发现了这个:cancelExpirationRenewal——取消续期方法:

protected void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        if (threadId != null) {
            task.removeThreadId(threadId);
        }

        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }

timeout.cancel();这一句完成了看门狗线程的释放。


版权声明:本文为m0_58554082原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。