目录
AQS 概述
AbstractQueuedSynchronizer来自于jdk 1.5,位于juc包中,简称为AQS- 类如其名,抽象的队列式的同步器,
AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock,ReentrantReadWriteLock,CountDownLatch - 在
AQS中,主要有两部分功能:一部分是操作state变量,第二部分是实现排队和阻塞机制
AQS 设计思想
对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞,唤醒等一系列复杂的实现,这些都在 AQS 中为我们处理好了。我们只需要负责处理获取,释放锁资源的状态 state的逻辑即可。这是很经典的模板方法设计模式的应用,AQS 为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列,出队列,阻塞,唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑放到 AQS 的子类中去实现即可
AQS 框架内部探究
AQS 框架内部

AbstractQueuedSynchronizer被设计为一个抽象类,它使用了一个volatile来修饰int类型的成员变量state来表示同步状态,通过内置的FIFO(先进先出)双向队列来完成资源获取线程的排队等待工作。通常AQS的子类通过继承AQS并实现它的抽象方法来管理同步状态AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法,AQS既可以支持独占式地访问同步状态( 如ReentrantLock),也可以支持共享式地访问同步状态(如CountDownLatch),这样就可以方便实现不同类型的同步组件
AQS 访问同步状态 state
重写 AQS 指定的方法时,需要使用 AQS 提供的如下 3 个方法来访问或修改同步状态,不同的锁实现都可以直接调用这 3 个方法
// 同步状态变量,或者代表共享资源
private volatile int state;
// 返回同步状态的当前值。此操作具有 volatile 读的内存语义,因此每次获取的都是最新值
protected final int getState() {
return state;
}
// 设置同步状态的最新值。此操作具有 volatile 写的内存语义,因此每次写数据都是写回主内存
protected final void setState(int newState) {
state = newState;
}
/**
* 如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的要更新的值
* * @param expect 预期值
* @param update 写入值
* @return 如果更新成功返回true,失败则返回false
*/
protected final boolean compareAndSetState(int expect, int update) {
// 内部调用 unsafe 的方法,该方法是一个 CAS 方法
// 这个 unsafe 类,实际上是比 AQS 更加底层的底层框架,或可以认为是 AQS 框架的基石
// CAS 操作在 Java 中的最底层的实现就是 Unsafe 类提供的,它是作为 Java 语言与 Hospot源码(C++)以及底层操作系统沟通的桥梁
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这三个方法 getState()、setState()、compareAndSetState() 都是 final 方法,是 AQS 提供的通用的访问同步状态的方法,能保证线程安全,我们直接调用即可
AQS 的自定义(子类)同步器的实现
AQS定义了两种资源享用方式:Exclusive(独占,如ReentrantLock)和Share(共享,如CountDownLatch)- 不同的自定义同步器竞争使用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源
state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队,唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法
/**
* 独占式获取锁,该方法需要查询当前状态并判断锁是否符合预期,然后再进行 CAS 设置锁。返回true 则成功,否则失败
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 独占式释放锁,等待获取锁的线程将有机会获取锁。返回 true 则成功,否则失败
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享式获取锁,返回大于等于 0 的值表示获取成功,否则失败
*
* 如果返回值小于 0,表示当前线程获取共享锁失败
* 如果返回值大于 0,表示当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁的行为很可能成功
* 如果返回值等于 0,表示当前线程共享锁成功,但是接下来其他线程尝试获取共享锁的行为会失败(实际上也有可能成功,在后面的源码部分会将)
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 共享式释放锁。返回 true 成功,否则失败
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 当前 AQS 是否在独占模式下被线程占用,一般表示是否被前当线程独占;
* 如果同步是以独占方式进行的,则返回 true;其他情况则返回 false
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
AQS 中同步队列
数据结构之队列
- 队列
queue是只允许在一端进行插入操作,而在另一端进行删除操作的线性表。队列的工作原理与现实生活中的队列完全相同。类似火车头进入山洞,先进入山洞的车厢就先出来山洞,后进入山洞的火车头后出来山洞。队列的工作原理与此相同 - 队列是一种先进先出(
First In First Out)的线性表,简称FIFO。允许插入的一端称为队尾,允许删除的一端称为队头。假设队列是q = (a1,a2,…,an),那么a1就是队头元素,而an是队尾元素。这样我们就可以删除时,总是从a1开始,而插入时,列在最后

因为队列属于线性表,因此队列也可以采用顺序存储结构和链式存储结构来实现。Java 中已经提供了很多线程的队列的实现,比如 JUC 中的各种阻塞、非阻塞队列
AQS 中同步队列数据结构(双链表实现的队列)
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
// 队列头结点,实际上是一个哨兵结点,不代表任何线程,head 所指向的 Node 的 thread 属性永远是 null
private transient volatile Node head;
// 队列尾结点,后续的结点都加入到队列尾部
private transient volatile Node tail;
// 同步状态
private volatile int state;
// Node内部类,同步队列的结点类型
static final class Node {
// 共享模式下构造的结点,用来标记该线程是获取共享资源时被阻塞挂起后放入AQS 队列的
static final Node SHARED = new Node();
// 独占模式下构造的结点,用来标记该线程是获取独占资源时被阻塞挂起后放入AQS 队列的
static final Node EXCLUSIVE = null;
/**
* 表示当前结点(线程)需要取消等待
* 由于在同步队列中等待的线程发生等待超时、中断、异常,即放弃获取锁,需要从同步队列中取消等待,就会变成这个状态
* 如果结点进入该状态,那么不会再变成其他状态
*/
static final int CANCELLED = 1;
/**
* 表示当前结点(线程)的后续结点(线程)需要取消等待(被唤醒)
* 如果一个结点状态被设置为SIGNAL,那么后继结点的线程处于挂起或者即将挂起的状态
* 当前结点的线程如果释放了锁或者放弃获取锁并且结点状态为SIGNAL,那么将会尝试唤醒后继结点的线程以运行
* 这个状态通常是由后继结点给前驱结点设置的。一个结点的线程将被挂起时,会尝试设置前驱结点的状态为SIGNAL
*/
static final int SIGNAL = -1;
/**
* 线程在等待队列里面等待,waitStatus值表示线程正在等待条件
* 原本结点在等待队列中,结点线程等待在Condition上,当其他线程对Condition调用了signal()方法之后
* 该结点会从从等待队列中转移到同步队列中,进行同步状态的获取
*/
static final int CONDITION = -2;
// 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点
static final int PROPAGATE = -3;
// 记录当前线程等待状态值,包括以上4中的状态,还有0,表示初始化状态
volatile int waitStatus;
// 前驱节点,当结点加入同步队列将会被设置前驱结点信息
volatile Node prev;
// 后继节点
volatile Node next;
// 当前获取到同步状态的线程
volatile Thread thread;
// 等待队列中的后继结点,如果当前结点是共享模式的,那么这个字段是一个SHARED常量
// 在独占锁模式下永远为null,仅仅起到一个标记作用,没有实际意义
Node nextWaiter;
// 如果是共享模式下等待,那么返回 true(因为上面的 Node nextWaiter 字段在共享模式下是一个SHARED 常量)
final boolean isShared() {
return nextWaiter == SHARED;
}
Node() { }
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
}
可以看到每个节点有两个域:prev 节点和 next节点,其实 AQS 还有两个重要的成员变量 head 和 tail,head 和 tail 管理了同步队列中的节点,也就是说 AQS 实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法

Node节点中的SHARED:用来标记该线程是获取共享资源(锁)时被阻塞挂起后放入AQS的同步队列中的Node节点中的EXCLUSIVE:用来标记该线程是获取独占资源(锁)时被阻塞挂起后放入AQS的同步队列中的Node节点中的waitStatus:表示当前线程等待状态值,包括以下5种状态- 第一种状态
CANCELLED = 1:表示当前线程节点需要取消等待。如果在同步队列中等待的线程发生了等待超时、中断、异常,即会放弃获取锁,需要从同步队列中取消等待,就会变成这个状态。如果线程节点进入该状态,那么不会再变成其他状态 - 第二种状态
SIGNAL = -1:表示后继节点在等待当前结点唤醒。后继节点入队时,会将前继节点的状态更新为SIGNAL。如果一个线程节点状态被设置为SIGNAL,那么它的后继节点的线程会处于等待的状态;如果当前节点的线程释放了锁或者放弃获取锁并且节点状态为SIGNAL,那么将会尝试唤醒它的后继节点的线程;这个状态通常是由后继节点给前驱节点设置的。一个节点的线程将被挂起时,会尝试设置前驱结点的状态为SIGNAL - 第三种状态
CONDITION = -2:表示线程节点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取同步锁 - 第四种状态
PROPAGATE = -3:共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点 - 第五种状态
0:新节点入队时的默认状态;代表初始化状态
AQS 中锁的获取与释放
AQS 提供的锁的获取和释放分为独占式的和共享式的,其线程同步的关键是对同步状态 state 的操作
获取独占式锁 acquire(int arg) 方法
调用 AQS 的 acquire(int arg) 方法可以获取独占式的锁,该方法不会响应中断,也就是由于线程获取同步状态失败后进入会同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。基于独占式实现的组件有 ReentrantLock
// 独占式的尝试获取锁,获取不成功就进入同步队列等待
public final void acquire(int arg) {
// 内部是由 4 个方法的调用组成的
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 调用
tryAcquire(arg)方法尝试获取锁,如果获取锁成功会返回true;否则获取锁失败返回false,然后进行下一步的操作 addWaiter()方法将该线程加入同步队列的尾部,并标记为独占模式acquireQueued()方法使线程阻塞在同步队列中获取锁,一直到获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false- 然后根据返回值判断是否调用
selfInterrupt()设置中断标志位,但此时线程处于运行态,即使设置中断标志位也不会抛出异常 - 线程获得锁,
acquire()方法结束,从lock()方法中返回
tryAcquire(arg) 尝试获取独占锁
该方法是 AQS 的子类去实现的,用于首次尝试获取独占锁,一般来说就是对 state 的改变、或重入锁的检查、设置当前获得锁的线程等。不同的锁有自己不同的逻辑实现,获取成功该方法就返回 true,失败就返回 false
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
addWaiter(Node mode) 加入到同步队列

addWaiter() 方法是 AQS 提供的,不需要被重写。此方法用于将当前线程加入到同步队列的队尾,并返回当前线程所在的节点
/**
* addWaiter(Node node)方法将获取锁失败的线程构造成结点加入到同步队列的尾部
*
* @param mode 模式。独占模式传入的是一个 Node.EXCLUSIVE,即 null;共享模式传入的是一个 Node.SHARED,即一个静态结点对象(共享的、同一个)
* @return 返回构造的结点
*/
private Node addWaiter(Node mode) {
// 1 首先构造节点
Node node = new Node(Thread.currentThread(), mode);
// 2 尝试将结点直接放在队尾
// 直接获取同步器的 tail 结点,使用 pred 来保存
Node pred = tail;
// 如果 pred 不为 null,实际上就是队列不为 null
if (pred != null) {
node.prev = pred;
// compareAndSetTail 的 CAS 方法来确保结点能够被线程安全的添加,虽然不一定能成功
if (compareAndSetTail(pred, node)) {
// 将新构造的结点置为原队尾结点的后继
pred.next = node;
// 返回新结点
return node;
}
}
/*
* 3 走到这里,可能是:
* (1) 由于可能是并发条件,并且上面的 CAS 操作并没有循环尝试,因此可能添加失败
* (2) 队列可能为 null
* 调用 enq 方法,采用自旋方式保证构造的新结点成功添加到同步队列中
* */
enq(node);
return node;
}
enq(final Node node) 保证节点入队
enq() 方法用在同步队列为 null 或一次 CAS 添加失败的时候,enq() 方法要保证节点最终必定添加成功
private Node enq(final Node node) {
// 死循环操作,直到添加成功
for (; ; ) {
// 获取尾节点 t
Node t = tail;
// 如果队列为 null,则初始化同步队列
if (t == null) {
// 调用 compareAndSetHead 方法,初始化同步队列
// 注意:这里是新建了一个空白结点,这就是传说中的哨兵结点。CAS 成功之后,head 将指向该哨兵结点,返回 true
if (compareAndSetHead(new Node()))
// 尾结点指向头结点(哨兵结点)
tail = head;
} else {
// 首先修改新结点前驱的指向,这一步不是安全的。但是没关系,因为这一步如果发生了冲突,那么下面的CAS操作必然之后有一条线程会成功。其他线程将会重新循环尝试
node.prev = t;
// 调用compareAndSetTail方法通过CAS方式尝试将结点添加到同步队列尾部
// 如果添加成功,那么才能继续下一步,结束这个死循环,否则就会不断循环尝试添加
if (compareAndSetTail(t, node)) {
// 修改原尾结点后继结点的指向
t.next = node;
// 返回新结点,结束死循环
return t;
}
}
}
}
acquireQueued(final Node node, int arg) 节点自旋获取锁
通过了 tryAcquire() 和 addWaiter() 方法,说明该线程获取独占锁失败,已经被放入同步队列尾部了
acquireQueued() 方法表示节点进入同步队列之后的动作,实际上就进入了一个自旋的过程,自旋过程中,当条件满足,获取到了独占锁,就可以从这个自旋中退出并返回,否则可能会阻塞该节点的线程,后续即使阻塞被唤醒,还是会自旋尝试获取锁,直到成功或者而抛出异常
final boolean acquireQueued(final Node node, int arg) {
// failed 表示获取锁是否失败标志
boolean failed = true;
try {
// interrupted 表示是否被中断标志
boolean interrupted = false;
// 死循环
for (; ; ) {
// 获取新结点的前驱结点
final Node p = node.predecessor();
// 只有前驱节点是头节点的时候才能尝试获取锁,同样调用 tryAcquire 方法获取锁
if (p == head && tryAcquire(arg)) {
// 获取到锁之后,就将自己设置为头结点(哨兵结点),线程出队列
setHead(node);
// 前驱结点(原哨兵结点)的链接置空,由JVM 回收
p.next = null;
// 获取锁是否失败改成 false,表示成功获取到了锁
failed = false;
// 返回 interrupted,即返回线程是否被中断
return interrupted;
}
// 前驱结点不是头结点或者获取同步状态失败
// shouldParkAfterFailedAcquire检测线程是否应该被挂起,如果返回true
// 则调用parkAndCheckInterrupt用于将线程挂起,否则重新开始循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 到这一步,说明是当前结点(线程)因为被中断而唤醒,那就改变自己的中断标志位状态信息为true。然后又从新开始循环,直到获取到锁,才能返回
interrupted = true;
}
} finally {
/*如果failed为true,表示获取锁失败,即对应发生异常的情况,
这里发生异常的情况只有在tryAcquire方法和predecessor方法中可能会抛出异常,此时还没有获得锁,failed=true,那么执行cancelAcquire方法,该方法用于取消该线程获取锁的请求,将该结点的线程状态改为CANCELLED,并尝试移除结点(如果是尾结点)
另外,在超时等待获取锁的的方法中,如果超过时间没有获取到锁,也会调用该方法
如果failed为false,表示获取到了锁,那么该方法直接结束,继续往下执行;
*/
if (failed)
// 取消获取锁请求,将当前结点从队列中移除,
cancelAcquire(node);
}
}
- 开启一个死循环,在死循环中进行下面的操作
-如果当前节点的前驱是head节点,那么尝试获取锁,如果获取锁成功,那么当前节点设置为头节点head,当前节点线程出队,表示当前线程已经获取到了锁,然后返回是否被中断标志,结束循环,进入finally - 如果当前节点的前驱不是
head节点或者尝试获取锁失败,那么判断当前线程是否应该被挂起,如果返回true,那么调用parkAndCheckInterrupt()挂起当前节点的线程,此时不再执行后续的步骤 - 如果当前线程不应该被挂起,即返回
false,那本次循环结束,继续下一次循环 - 如果线程被其他线程唤醒,那么判断是否是因为中断而被唤醒并修改标志位,同时继续循环,直到在步骤
2获得锁,才能跳出循环 - 最终,线程获得了锁跳出循环,或者发生异常跳出循环,那么会执行
finally语句块,finally中判断线程是否是因为发生异常而跳出循环,如果是,那么执行cancelAcquire()方法取消该节点获取锁的请求;如果不是,即因为获得锁跳出循环,则finally中什么也不干
selfInterrupt() 安全中断
根据 !tryAcquire() 和 acquireQueued() 返回值判断是否需要设置中断标志位,只有tryAcquire() 尝试失败,并且 acquireQueued() 方法 true 时,才表示该线程是被中断过了的
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
获取独占式锁 acquire(int arg) 方法总结(重点)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 调用
AQS的子类去实现的tryAcquire()方法,尝试直接去获取独占式锁,如果成功则直接返回 - 没成功,则调用方法
addWaiter()将该线程加入等待队列的尾部,并标记为独占模式 acquireQueued()方法使线程在等待队列中休息,有机会时(轮到自己,会调用unpark())会去尝试获取独占式锁。获取到独占式锁后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false- 如果线程在等待过程中被中断过,它是不响应的。只是获取独占式锁后才再进行自我中断
selfInterrupt(),将中断补上

释放独占式锁 release(int arg)
当前线程获取到锁并执行了相应逻辑之后,就需要释放独占式锁,使得后续节点能够继续获取锁。调用 AQS 的 release(int arg) 方法可以释放独占式锁
public final boolean release(int arg) {
// tryRelease() 释放同步状态,该方法是子类重写实现的方法
// 释放成功将返回 true,否则返回 false 或者自己实现的逻辑
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 如果头结点不为 null 并且状态不等于 0
if (h != null && h.waitStatus != 0)
// 那么唤醒头结点的一个出于等待锁状态的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(int arg) 尝试释放独占式锁
该方法是需要继承 AQS 的子类去实现的。正常来说,tryRelease() 都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state - = arg),也不需要考虑线程安全的问题。AQS 的子类在实现时,如果已经彻底释放资源(state = 0),要返回 true,否则返回 false
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor(Node node) 唤醒后继节点
此方法用于唤醒等待队列中下一个线程
private void unparkSuccessor(Node node) {
// 这里,node一般为当前线程所在的节点
int ws = node.waitStatus;
if (ws < 0)
// 如果当前结点的状态小于 0,那么 CAS 设置为 0,表示后继结点线程可以先尝试获取锁,而不是直接挂起
compareAndSetWaitStatus(node, ws, 0);
// 找到下一个需要唤醒的节点 s
Node s = node.next;
// 如果为空或已取消等待
if (s == null || s.waitStatus > 0) {
s = null;
// 则从tail开始到node之间倒序向前查找,找到离tail最近的非取消结点赋给s
for (Node t = tail; t != null && t != node; t = t.prev)
// 从这里可以看出,<=0 的结点,都是还有效的结点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒
LockSupport.unpark(s.thread);
}
- 如果当前结点的状态小于
0,那么CAS设置为0,表示后继节点可以继续尝试获取独占式锁 - 如果当前节点的后继
s为null或者状态为已取消CANCELLED等待,则将s先指向null;然后从tail开始到node之间倒序向前查找,找到离tail最近的非取消节点赋给s。需要从后向前遍历,因为同步队列只保证结点前驱关系的正确性 - 如果
s不为null,那么状态肯定不是已取消CANCELLED等待,则直接唤醒s的线程,调用LockSupport.unpark()方法唤醒,被唤醒的结点将从被park的位置继续执行
释放独占式锁总结
release() 方法是独占模式下线程释放独占式锁的顶层入口。如果彻底释放了(即 state = 0),它会唤醒等待队列里的其他线程来获取独占式锁

获取共享式锁 acquireShared(int arg)
- 共享式锁与独占式锁的区别:就是同一时刻是否可以允许有多个线程同时获取到锁
- 对于共享式锁来说,如果一个线程成功获取了共享式锁,那么其他等待在这个共享锁上的线程就也可以尝试去获取锁,并且极有可能获取成功。基于共享式实现的组件有
CountDownLatch、Semaphore等 - 调用
AQS的acquireShared()方法可以获取共享式锁,同样该方法不响应中断
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 失败则调用doAcquireShared方法将当前线程封装为Node.SHARED类型的Node 结点后加入到AQS 同步队列的尾部,
// 然后"自旋"尝试获取同步状态,如果还是获取不到,那么最终使用 park 方法挂起自己
doAcquireShared(arg);
}
tryAcquireShared(int arg) 尝试获取共享式锁
该方法是 AQS 的子类去自己实现的,用于尝试获取共享锁,一般来说就是对 state 的改变、或者重入锁的检查等等,不同的锁有自己相应的逻辑实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
返回 int 类型的值(如返回剩余的 state 状态值-资源数量),一般的理解为
- 如果返回值小于
0,表示当前线程获取共享锁失败 - 如果返回值大于
0,表示当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁很可能成功 - 如果返回值等于
0,表示当前线程获取共享锁成功,但是接下来其他线程尝试获取共享锁会失败。实际上在AQS的实际实现中,即使某时刻返回值等于0,接下来其他线程尝试获取共享锁也可能会成功。即某线程获取锁并且返回值等于0之后,马上又有线程释放了锁,导致实际上可获取锁数量大于0,此时后继还是可以尝试获取锁的
doAcquireShared(int arg) 自旋获取共享锁
每个节点可以尝试获取锁(独占锁或共享锁)的要求是前驱节点是头节点,那么它本身就是整个队列中的第二个节点,每个获得锁的节点都一定是成为过头节点。那么如果某第二个节点因为不满足条件没有获取到共享锁而被挂起,那么即使后续节点满足条件也一定不能获取到共享锁
/**
* 自旋尝试共享式获取锁,一段时间后可能会挂起
* 和独占式获取的区别:
* 1 以共享模式Node.SHARED添加结点
* 2 获取到锁之后,修改当前的头结点,并将信息传播到后续的结点队列中
*/
private void doAcquireShared(int arg) {
// addWaiter方法逻辑,和独占式获取的区别 :以共享模式Node.SHARED添加结点
final Node node = addWaiter(Node.SHARED);
// 当前线程获取锁失败的标志
boolean failed = true;
try {
// 当前线程的中断标志
boolean interrupted = false;
for (; ; ) {
// 获取前驱结点
final Node p = node.predecessor();
// 当前驱节点是头节点的时候就会以共享的方式去尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
// 返回值如果大于等于0,则表示获取到了锁
if (r >= 0) {
// 和独占式获取的区别:修改当前的头结点,根据传播状态判断是否要唤醒后继结点
setHeadAndPropagate(node, r);
// 释放掉已经获取到锁的前驱结点
p.next = null;
// 检查设置中断标志
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断是否应该挂起,以及挂起的方法,和acquireQueued方法的逻辑完全一致,不会响应中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 调用
addWaiter()方法,将当前线程封装为Node.SHARED模式的Node节点后加入到AQS同步队列的尾部,即表示共享模式 - 后面就是类似于
acquireQueued()方法的逻辑,节点自旋尝试获取共享锁。如果还是获取不到,那么最终使用park()方法挂起自己等待被唤醒
setHeadAndPropagate(node, r) 设置节点并传播信息
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);//head指向自己
//如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
- 设置新
head节点信息 - 根据传播状态判断是否要唤醒后继节点
doReleaseShared() 唤醒后继节点
doReleaseShared() 用于在共享模式下唤醒后继节点
/**
- 共享式获取锁的核心方法,尝试唤醒一个后继线程,被唤醒的线程会尝试获取共享锁,如果成功之后,则又会有可能调用 setHeadAndPropagate,将唤醒传播下去。
- 独占锁只有在一个线程释放所之后才会唤醒下一个线程,而共享锁在一个线程在获取到锁和释放掉锁锁之后,都可能会调用这个方法唤醒下一个线程
- 因为在共享锁模式下,锁可以被多个线程所共同持有,既然当前线程已经拿到共享锁了,那么就可以直接通知后继结点来获取锁,而不必等待锁被释放的时候再通知
*/
private void doReleaseShared() {
// 一个死循环
for (; ; ) {
// 获取当前的 head,每次循环读取最新的 head
Node h = head;
// 如果h不为null且h不为tail,表示队列至少有两个结点,那么尝试唤醒head后继结点线程
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头结点的状态为 SIGNAL,那么表示后继结点需要被唤醒
if (ws == Node.SIGNAL) {
// 尝试 CAS 设置 h 的状态从 Node.SIGNAL 变成 0
// 可能存在多线程操作,但是只会有一条成功
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 失败的线程结束本次循环,继续下一次循环
continue;
// 成功的那一条线程会调用unparkSuccessor方法唤醒head的一个没有取消的后继结点
unparkSuccessor(h);
}
/*
* 如果h状态为0,那说明后继结点线程已经是唤醒状态了或者将会被唤醒,不需要该线程来唤醒
* 那么尝试设置h状态从0变成PROPAGATE,如果失败则继续下一次循环,此时设置PROPAGATE状态能保证唤醒操作能够传播下去
* 因为后继结点成为头结点时,在setHeadAndPropagate方法中能够读取到原head结点的PROPAGATE状态<0,从而让它可以尝试唤醒后继结点(如果存在)
* */
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 失败的线程结束本次循环,继续下一次循环
continue;
}
// 执行到这一步说明在上面的判断中队列可能只有一个结点,或者unparkSuccessor方法调用完毕,或h状态为PROPAGATE(不需要继续唤醒后继)
if (h == head)
break;
}
}
释放共享式锁 releaseShared(int arg)
对于支持共享式的同步组件(即多个线程同时访问),它们和独占式的主要区别就是 tryReleaseShared() 方法必须确保锁的释放是线程安全的(因为既然是多个线程能够访问,那么释放的时候也会是多个线程的,就需要保证释放时候的线程安全)。由于 tryReleaseShared() 方法也是我们自己实现的,因此需要我们自己实现线程安全,所以常常采用 CAS 的方式来释放同步状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 释放成功,必定调用doReleaseShared尝试唤醒后继结点
doReleaseShared();
return true;
}
return false;
}
AQS 总结重点
不同的 AQS 同步器争用共享资源的方式也不同。AQS 同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队,唤醒出队等),AQS 已经实现好了。AQS 子类同步器实现时主要实现以下几种方法
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它tryAcquire(int):尝试获取独占式锁,成功则返回true,失败则返回falsetryRelease(int):尝试释放独占式锁,成功则返回true,失败则返回falsetryAcquireShared(int):尝试获取共享式锁。负数表示获取失败;0表示成功,但是其他线程尝试获取共享锁会失败;正数表示成功,其他线程尝试获取共享锁很可能成功tryReleaseShared(int):尝试释放共享式锁,如果释放后允许唤醒后续等待节点返回true,否则返回falseAQS中有一个核心状态是waitStatus,这个代表节点的状态,决定了当前节点的后续操作,比如是否等待唤醒,是否要唤醒后继节点
参考:https://www.cnblogs.com/waterystone/p/4920797.html
推荐好文章:https://mp.weixin.qq.com/s/Vz3xLwKpodyU5RN9ERGgbA