bilibili-Java并发学习笔记14 AQS 之 ReentrantLock
基于 java 1.8.0
P46_可重入锁对于AQS的实现源码分析
ReentrantLock 使用案例
Lock lock = new ReentrantLock();
lock.lock();
try {
// do something
} finally {
lock.unlock();
}
- 公平锁和非公平锁
// 默认非公平锁
Lock lock = new ReentrantLock();
// 公平锁
Lock lock = new ReentrantLock(true);
// 非公平锁
Lock lock = new ReentrantLock(false);
/**
* 底层实现
*/
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {...}
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {...}
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {...}
- 非公平锁实现 lock() 上锁实现
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// cas 操作,若 state 是 0,则将其改为 1,并将独占锁的线程标记指向自己
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 若互斥锁已被别的线程占有,则
else
// AQS 的 acquire(int) 方法
acquire(1);
}
/**
* 以独占模式获取,忽略中断。通过调用至少一次 tryAcquire 在成功时返回来实现。
* 否则线程将排队,可能会反复阻塞和取消阻塞,调用 tryAcquire 直到成功。
* 此方法可用于实现方法 Lock.lock()
*
* @param arg the acquire argument. 这个值被传递到tryAcquire,但在其他方面是不令人惊讶的,可以表示任何您喜欢的。
*/
public final void acquire(int arg) {
// tryAcquire 尝试获取锁
// addWaiter 将线程加入等待队列以独占模式
// acquireQueued 查看当前线程是否被中断
if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) )
selfInterrupt();
}
// NonfairSync in ReentrantLock 实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock.
* tryAcquire是在子类中实现的,但这两个类都需要trylock方法的非空尝试。
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// state 为 0 表示当前无线程占有锁
// 通过 CAS 操作去获取锁
if (compareAndSetState(0, acquires)) {
// 当前占有锁的线程标记为自己
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程已被占有,且占有的线程正是自己
// 重新进入这个锁
// 所以说 ReentrantLock 是可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 不过 state 值 +1
setState(nextc);
return true;
}
return false;
}
/**
* 为当前线程和给定模式创建并排队节点。
*
* @param mode Node 模式 : Node.EXCLUSIVE 独占锁, Node.SHARED 共享锁
* @return the new node
*/
private Node addWaiter(Node mode) {
// 创建队列节点
Node node = new Node(Thread.currentThread(), mode);
// 尝试enq的快速路径;失败时备份到完整enq
// 将新节点加入队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 以独占不间断模式获取队列中已存在的线程。由条件等待方法和获取使用。
*
* @param node the node
* @param arg the acquire argument
* @return 如果在等待时中断返回 true
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- 公平锁实现 lock() 上锁实现
final void lock() {
// 请求锁
acquire(1);
}
public final void acquire(int arg) {
// 尝试获取独占锁
// 若获取失败加入等待队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 公平锁版本的 tryAcquire.
* 除非递归调用或没有等待者或是第一个,否则不要授予访问权限。
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 无线程占有锁
if (c == 0) {
// hasQueuedPredecessors 判断等待队列中有没有其他等待线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* 查询是否有任何线程等待获取的时间比当前线程长。
*
* 调用此方法相当于(但可能比)更有效:
* getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()}
*
* 请注意,由于中断和超时可能会在任何时候发生取消,
* 所以一个真正的返回并不保证其他线程会在当前线程之前获取。
* 同样,由于队列为空,因此在该方法返回false之后,另一个线程也有可能赢得排队竞争。
*
* 这种方法被设计用于公平同步器,以避免碰撞。
* 这种同步器的 tryAcquire 方法应该返回 false,如果这个方法返回 true(除非这是可重入的获取),
* 那么它的 tryAcquireShared 方法应该返回一个负值。
* 例如,公平、可重入、独占模式同步器的 tryAcquire 方法可能如下所示:
*
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}
*
* @return 如果在当前线程之前有一个排队线程,则为true;如果当前线程位于队列的头部或队列为空,则为 false
*
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// 它的正确性取决于head在tail之前被初始化头。下一个如果当前线程是队列中的第一个线程,则为精确的。
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- 释放锁
public void unlock() {
sync.release(1);
}
/**
* 以独占模式释放。如果 tryRelease 返回true,则通过取消阻止一个或多个线程来实现。此方法可用于实现方法 Lock#unlock
*
*
*
* @param arg 释放参数。这个值被传递到tryRelease,但在其他方面是不令人惊讶的,可以表示任何您喜欢的。
* @return 从tryRelease返回的值
*/
public final boolean release(int arg) {
// 若锁完全释放(state为0)
if (tryRelease(arg)) {
// 唤醒节点的后续节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 只能由占有锁的线程释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 完全释放锁
free = true;
setExclusiveOwnerThread(null);
}
// 重入锁层次 -1
setState(c);
return free;
}
/**
* 唤醒节点的后续节点(如果存在)。
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* 如果状态为阴性(即可能需要信号),则尝试清除,以等待信号。如果失败或状态被等待线程更改,则正常。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* unpark的线程被保存在后续节点中,后者通常只是下一个节点。但如果取消或明显为空,则从尾部向后遍历,以找到实际未取消的后继项。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
版权声明:本文为u013837825原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。