ReentrantLock加锁原理与源码分析
在实际代码开发过程中,只要用到了多线程就会存在数据的安全性问题,实现锁的方式很多;例如:synchronized关键字、Lock实现类、Redis锁,分布式锁等。本文主要讲ReentrantLock(可重入锁)的实现原理。
另外,本文只对公平锁说明,非公平锁与公平锁的区别仅在于在放入队列之前尝试通过改变state的值,如果改变成功则加锁成功,否则加锁失败。
一、ReentrantLock的加锁原理
先来一个逻辑图,ReentrantLock加锁的过程就是如此简单:
1、通过lock()方法加锁,通过unlock()方法释放锁;
2、调用lock()之后,内部会判断当前锁的状态,只有在state == 0
成立的情况下锁才是空闲的。
3、在空闲状态下,并不能立刻加锁成功,还要判断等待队列中有没有等待的线程。如果没有则直接加锁成功,如果有则优先从队列中获取线程加锁。
4、如果state > 0
则说明当前锁被其他线程持有,持有该锁的线程在exclusiveOwnerThread
属性中关联。
5、判断exclusiveOwnerThread == currentThread
是否成立;如果成立说明当前线程持有该锁,则对state累加1(通过这种方式实现锁重入);
6、如果该锁不是当前线程持有,则把该线程加入到队列中等待。
ReentrantLock加锁逻辑简单明了,下面则对实现源码分析。
二、源码解析
1、ReentrantLock构造函数和lock方法:
// 引入Sync对象,实际实现加锁的逻辑时sync
private final Sync sync;
// 默认使用的是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 可通过参数是否使用公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
通过构造函数可知默认使用非公平锁,而实际加锁逻辑时Sync
实现的。
2、Sync分析
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 加锁逻辑由Sync的子类 FairSync/NoFairSync实现公平或非公平锁
abstract void lock();
// 非公平锁实现加锁逻辑
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 释放锁:通过下面的逻辑可以看出,同一个线程加了几次锁就要释放几次锁,否则这个线程还将持有这把锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 判断当前线程是否持有该锁
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 触发队列中的线程获取锁
final ConditionObject newCondition() {
return new ConditionObject();
}
// 获取持有该锁的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取该锁持有的数量
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 获取锁状态
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
Sync中实现了除加锁之外的所有功能,因为加锁有公平和非公平,所以在FairSync中实现了公平锁,NonFairSync中实现了非公平锁。Sync的功能我都在上面的代码中做了注释,每一个都很清楚。
3、FairSync中的lock
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// 加锁逻辑
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
从上面代码中可以看出,lock方法调用了acquire(1)
,acquire是AbstractQueuedSynchronizer的逻辑,如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
以上代码可以看出,实际加锁逻辑时由tryAcquire
实现,在tryAcquire
加锁失败之后,acquireQueued
将失败的线程存放到队列中。所以主要的逻辑有两块,一个是加锁,一个是将线程加入队列。
4、FairSync的tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // @1
int c = getState(); // @2
if (c == 0) {
if (!hasQueuedPredecessors() && // @3
compareAndSetState(0, acquires)) { // @4
setExclusiveOwnerThread(current); // @5
return true;
}
} else if (current == getExclusiveOwnerThread()) { // @6
int nextc = c + acquires; // @7
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // @8
return true;
}
return false;
}
(1)获取当前线程;
(2)获取该锁的状态;
(3)如果该锁未被持有,hasQueuedPredecessors()判断队列中是否有等待线程;如果没有等待线程,compareAndSetState进行CAS加锁操作;compareAndSetState加锁成功,setExclusiveOwnerThread设置当前线程为持有线程,返回true;
(4)如果锁被持有,判断持有该锁的线程是否是当前线程;如果是当前线程,对state执行加一操作;更改锁状态,返回true。
(5)否则返回false。
5、NonfairSync源码
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
非公平锁的lock可以看出,在执行acquire之前,先执行compareAndSetState,该方法就是将state数据尝试更改,如果能更改成功则加锁成功;所以公平锁与非公平锁之间的区别就在于此。
下面主要看NonfairSync的加锁实现tryAcquire,从上面代码中可以看出,tryAcquire调用了nonfairTryAcquire方法,而该方法在Sync中实现,我们再返回来分析nonfairTryAcquire。
6、NonfairSync的nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
(1)先获取当前线程;
(2)获取锁状态;
(3)如果锁未被持有,则先尝试获取锁(公平锁中是先判断队列中有没有数据),如果获取成功则设置exclusiveOwnerThread并返回true;
(4)如果已被持有,则判断是否是当前线程持有,如果是则直接更改state的值,返回true。
(5)否则返回false。
7、AbstractQueuedSynchronizer加入队列逻辑
// tryAcquire加锁失败之后,acquireQueued放入到队列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 创建一个Node对象,用来标记线程队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 将addWaiter创建的Node对象加入到队列中
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Node就是一个双向链表,新创建的Node对象会被加入到队列的尾部。