Java中的AQS

AQS 概述

  • AbstractQueuedSynchronizer 来自于 jdk 1.5,位于 juc 包中,简称为 AQS
  • 类如其名,抽象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的 ReentrantLock,ReentrantReadWriteLock,CountDownLatch
  • AQS 中,主要有两部分功能:一部分是操作 state 变量,第二部分是实现排队和阻塞机制

AQS 设计思想

对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞,唤醒等一系列复杂的实现,这些都在 AQS 中为我们处理好了。我们只需要负责处理获取,释放锁资源的状态 state的逻辑即可。这是很经典的模板方法设计模式的应用,AQS 为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列,出队列,阻塞,唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑放到 AQS 的子类中去实现即可

AQS 框架内部探究

AQS 框架内部

在这里插入图片描述

  • AbstractQueuedSynchronizer 被设计为一个抽象类,它使用了一个 volatile 来修饰 int 类型的成员变量 state 来表示同步状态,通过内置的 FIFO (先进先出)双向队列来完成资源获取线程的排队等待工作。通常 AQS 的子类通过继承 AQS 并实现它的抽象方法来管理同步状态
  • AQS 自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法,AQS 既可以支持独占式地访问同步状态( 如ReentrantLock),也可以支持共享式地访问同步状态(如 CountDownLatch),这样就可以方便实现不同类型的同步组件

AQS 访问同步状态 state

重写 AQS 指定的方法时,需要使用 AQS 提供的如下 3 个方法来访问或修改同步状态,不同的锁实现都可以直接调用这 3 个方法

// 同步状态变量,或者代表共享资源
private volatile int state;

// 返回同步状态的当前值。此操作具有 volatile 读的内存语义,因此每次获取的都是最新值
protected final int getState() {
	return state;
}

// 设置同步状态的最新值。此操作具有 volatile 写的内存语义,因此每次写数据都是写回主内存
protected final void setState(int newState) {
	state = newState;
}

/**
 * 如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的要更新的值
 *  * @param expect 预期值
 * @param update 写入值
 * @return 如果更新成功返回true,失败则返回false
 */
protected final boolean compareAndSetState(int expect, int update) {
	// 内部调用 unsafe 的方法,该方法是一个 CAS 方法
    // 这个 unsafe 类,实际上是比 AQS 更加底层的底层框架,或可以认为是 AQS 框架的基石
    // CAS 操作在 Java 中的最底层的实现就是 Unsafe 类提供的,它是作为 Java 语言与 Hospot源码(C++)以及底层操作系统沟通的桥梁
	return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

这三个方法 getState()、setState()、compareAndSetState() 都是 final 方法,是 AQS 提供的通用的访问同步状态的方法,能保证线程安全,我们直接调用即可

AQS 的自定义(子类)同步器的实现

  • AQS 定义了两种资源享用方式:Exclusive(独占,如 ReentrantLock)和 Share(共享,如 CountDownLatch
  • 不同的自定义同步器竞争使用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队,唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法
/**
 * 独占式获取锁,该方法需要查询当前状态并判断锁是否符合预期,然后再进行 CAS 设置锁。返回true 则成功,否则失败
 */
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

/**
 * 独占式释放锁,等待获取锁的线程将有机会获取锁。返回 true 则成功,否则失败
 */
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

/**
 * 共享式获取锁,返回大于等于 0 的值表示获取成功,否则失败
 *
 * 如果返回值小于 0,表示当前线程获取共享锁失败
 * 如果返回值大于 0,表示当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁的行为很可能成功
 * 如果返回值等于 0,表示当前线程共享锁成功,但是接下来其他线程尝试获取共享锁的行为会失败(实际上也有可能成功,在后面的源码部分会将)
 */
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

/**
 * 共享式释放锁。返回 true 成功,否则失败
 */
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

/**
 * 当前 AQS 是否在独占模式下被线程占用,一般表示是否被前当线程独占;
 * 如果同步是以独占方式进行的,则返回 true;其他情况则返回 false
 */
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

AQS 中同步队列

数据结构之队列

  • 队列 queue 是只允许在一端进行插入操作,而在另一端进行删除操作的线性表。队列的工作原理与现实生活中的队列完全相同。类似火车头进入山洞,先进入山洞的车厢就先出来山洞,后进入山洞的火车头后出来山洞。队列的工作原理与此相同
  • 队列是一种先进先出(First In First Out)的线性表,简称 FIFO。允许插入的一端称为队尾,允许删除的一端称为队头。假设队列是 q = (a1,a2,…,an),那么 a1 就是队头元素,而 an 是队尾元素。这样我们就可以删除时,总是从 a1 开始,而插入时,列在最后

在这里插入图片描述
因为队列属于线性表,因此队列也可以采用顺序存储结构和链式存储结构来实现。Java 中已经提供了很多线程的队列的实现,比如 JUC 中的各种阻塞、非阻塞队列

AQS 中同步队列数据结构(双链表实现的队列)

public abstract class AbstractQueuedSynchronizer extends
            AbstractOwnableSynchronizer implements java.io.Serializable {
		
	protected AbstractQueuedSynchronizer() { }
	
  	// 队列头结点,实际上是一个哨兵结点,不代表任何线程,head 所指向的 Node 的 thread 属性永远是 null
    private transient volatile Node head;
    
    // 队列尾结点,后续的结点都加入到队列尾部    
    private transient volatile Node tail;
      
    // 同步状态  
    private volatile int state;

    // Node内部类,同步队列的结点类型
    static final class Node {

        // 共享模式下构造的结点,用来标记该线程是获取共享资源时被阻塞挂起后放入AQS 队列的
    	static final Node SHARED = new Node();
            
        // 独占模式下构造的结点,用来标记该线程是获取独占资源时被阻塞挂起后放入AQS 队列的    
		static final Node EXCLUSIVE = null;

       /**
        * 表示当前结点(线程)需要取消等待
        * 由于在同步队列中等待的线程发生等待超时、中断、异常,即放弃获取锁,需要从同步队列中取消等待,就会变成这个状态
        * 如果结点进入该状态,那么不会再变成其他状态
        */
		static final int CANCELLED = 1;
		
       /**
        * 表示当前结点(线程)的后续结点(线程)需要取消等待(被唤醒)
        * 如果一个结点状态被设置为SIGNAL,那么后继结点的线程处于挂起或者即将挂起的状态
        * 当前结点的线程如果释放了锁或者放弃获取锁并且结点状态为SIGNAL,那么将会尝试唤醒后继结点的线程以运行
        * 这个状态通常是由后继结点给前驱结点设置的。一个结点的线程将被挂起时,会尝试设置前驱结点的状态为SIGNAL
        */
		static final int SIGNAL = -1;
		
       /**
        * 线程在等待队列里面等待,waitStatus值表示线程正在等待条件
        * 原本结点在等待队列中,结点线程等待在Condition上,当其他线程对Condition调用了signal()方法之后
        * 该结点会从从等待队列中转移到同步队列中,进行同步状态的获取
        */
		static final int CONDITION = -2;
		
        // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点
		static final int PROPAGATE = -3;
            
        // 记录当前线程等待状态值,包括以上4中的状态,还有0,表示初始化状态    
		volatile int waitStatus;

        // 前驱节点,当结点加入同步队列将会被设置前驱结点信息
		volatile Node prev;

        // 后继节点  
		volatile Node next;

		// 当前获取到同步状态的线程
		volatile Thread thread;

        // 等待队列中的后继结点,如果当前结点是共享模式的,那么这个字段是一个SHARED常量 
        // 在独占锁模式下永远为null,仅仅起到一个标记作用,没有实际意义    
		Node nextWaiter;

		// 如果是共享模式下等待,那么返回 true(因为上面的 Node nextWaiter 字段在共享模式下是一个SHARED 常量)
		final boolean isShared() {
        	return nextWaiter == SHARED;
		}

		Node() { }

		Node(Thread thread, Node mode) {
        	this.nextWaiter = mode;
            this.thread = thread;
		}

		Node(Thread thread, int waitStatus) { 
         	this.waitStatus = waitStatus;
       		this.thread = thread;
        }
	}
}

可以看到每个节点有两个域:prev 节点和 next节点,其实 AQS 还有两个重要的成员变量 headtailheadtail 管理了同步队列中的节点,也就是说 AQS 实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法

在这里插入图片描述

  • Node 节点中的 SHARED:用来标记该线程是获取共享资源(锁)时被阻塞挂起后放入 AQS 的同步队列中的
  • Node 节点中的 EXCLUSIVE:用来标记该线程是获取独占资源(锁)时被阻塞挂起后放入 AQS 的同步队列中的
  • Node 节点中的 waitStatus:表示当前线程等待状态值,包括以下 5 种状态
  • 第一种状态 CANCELLED = 1:表示当前线程节点需要取消等待。如果在同步队列中等待的线程发生了等待超时、中断、异常,即会放弃获取锁,需要从同步队列中取消等待,就会变成这个状态。如果线程节点进入该状态,那么不会再变成其他状态
  • 第二种状态 SIGNAL = -1:表示后继节点在等待当前结点唤醒。后继节点入队时,会将前继节点的状态更新为 SIGNAL。如果一个线程节点状态被设置为 SIGNAL,那么它的后继节点的线程会处于等待的状态;如果当前节点的线程释放了锁或者放弃获取锁并且节点状态为 SIGNAL,那么将会尝试唤醒它的后继节点的线程;这个状态通常是由后继节点给前驱节点设置的。一个节点的线程将被挂起时,会尝试设置前驱结点的状态为 SIGNAL
  • 第三种状态 CONDITION = -2:表示线程节点等待在 Condition 上,当其他线程调用了 Conditionsignal() 方法后,CONDITION 状态的节点将从等待队列转移到同步队列中,等待获取同步锁
  • 第四种状态 PROPAGATE = -3:共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点
  • 第五种状态 0:新节点入队时的默认状态;代表初始化状态

AQS 中锁的获取与释放

AQS 提供的锁的获取和释放分为独占式的和共享式的,其线程同步的关键是对同步状态 state 的操作

获取独占式锁 acquire(int arg) 方法

调用 AQSacquire(int arg) 方法可以获取独占式的锁,该方法不会响应中断,也就是由于线程获取同步状态失败后进入会同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。基于独占式实现的组件有 ReentrantLock

// 独占式的尝试获取锁,获取不成功就进入同步队列等待
public final void acquire(int arg) {
    // 内部是由 4 个方法的调用组成的
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • 调用 tryAcquire(arg) 方法尝试获取锁,如果获取锁成功会返回 true;否则获取锁失败返回 false,然后进行下一步的操作
  • addWaiter() 方法将该线程加入同步队列的尾部,并标记为独占模式
  • acquireQueued() 方法使线程阻塞在同步队列中获取锁,一直到获取到资源后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false
  • 然后根据返回值判断是否调用 selfInterrupt() 设置中断标志位,但此时线程处于运行态,即使设置中断标志位也不会抛出异常
  • 线程获得锁,acquire() 方法结束,从 lock() 方法中返回

tryAcquire(arg) 尝试获取独占锁

该方法是 AQS 的子类去实现的,用于首次尝试获取独占锁,一般来说就是对 state 的改变、或重入锁的检查、设置当前获得锁的线程等。不同的锁有自己不同的逻辑实现,获取成功该方法就返回 true,失败就返回 false

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

addWaiter(Node mode) 加入到同步队列

在这里插入图片描述
addWaiter() 方法是 AQS 提供的,不需要被重写。此方法用于将当前线程加入到同步队列的队尾,并返回当前线程所在的节点

/**
 * addWaiter(Node node)方法将获取锁失败的线程构造成结点加入到同步队列的尾部
 *
 * @param mode 模式。独占模式传入的是一个 Node.EXCLUSIVE,即 null;共享模式传入的是一个 Node.SHARED,即一个静态结点对象(共享的、同一个)
 * @return 返回构造的结点
 */
private Node addWaiter(Node mode) {
    // 1 首先构造节点
    Node node = new Node(Thread.currentThread(), mode);
    // 2 尝试将结点直接放在队尾
    // 直接获取同步器的 tail 结点,使用 pred 来保存
    Node pred = tail;
    // 如果 pred 不为 null,实际上就是队列不为 null
    if (pred != null) {
        node.prev = pred;
        // compareAndSetTail 的 CAS 方法来确保结点能够被线程安全的添加,虽然不一定能成功
        if (compareAndSetTail(pred, node)) {
            // 将新构造的结点置为原队尾结点的后继
            pred.next = node;
            // 返回新结点
            return node;
        }
    }
    /*
     * 3 走到这里,可能是:
     * (1) 由于可能是并发条件,并且上面的 CAS 操作并没有循环尝试,因此可能添加失败
     * (2) 队列可能为 null
     * 调用 enq 方法,采用自旋方式保证构造的新结点成功添加到同步队列中
     * */
    enq(node);
    return node;
}
enq(final Node node) 保证节点入队

enq() 方法用在同步队列为 null 或一次 CAS 添加失败的时候,enq() 方法要保证节点最终必定添加成功

private Node enq(final Node node) {
    // 死循环操作,直到添加成功
    for (; ; ) {
        // 获取尾节点 t
        Node t = tail;
        // 如果队列为 null,则初始化同步队列
        if (t == null) {
            // 调用 compareAndSetHead 方法,初始化同步队列
            // 注意:这里是新建了一个空白结点,这就是传说中的哨兵结点。CAS 成功之后,head 将指向该哨兵结点,返回 true
            if (compareAndSetHead(new Node()))
                // 尾结点指向头结点(哨兵结点)
                tail = head;
        } else {
            // 首先修改新结点前驱的指向,这一步不是安全的。但是没关系,因为这一步如果发生了冲突,那么下面的CAS操作必然之后有一条线程会成功。其他线程将会重新循环尝试
            node.prev = t;
            // 调用compareAndSetTail方法通过CAS方式尝试将结点添加到同步队列尾部
            // 如果添加成功,那么才能继续下一步,结束这个死循环,否则就会不断循环尝试添加
            if (compareAndSetTail(t, node)) {
                // 修改原尾结点后继结点的指向
                t.next = node;
                // 返回新结点,结束死循环
                return t;
            }
        }
    }
}

acquireQueued(final Node node, int arg) 节点自旋获取锁

通过了 tryAcquire()addWaiter() 方法,说明该线程获取独占锁失败,已经被放入同步队列尾部了

acquireQueued() 方法表示节点进入同步队列之后的动作,实际上就进入了一个自旋的过程,自旋过程中,当条件满足,获取到了独占锁,就可以从这个自旋中退出并返回,否则可能会阻塞该节点的线程,后续即使阻塞被唤醒,还是会自旋尝试获取锁,直到成功或者而抛出异常

final boolean acquireQueued(final Node node, int arg) {
    // failed 表示获取锁是否失败标志
    boolean failed = true;
    try {
        // interrupted 表示是否被中断标志
        boolean interrupted = false;
        // 死循环
        for (; ; ) {
            // 获取新结点的前驱结点
            final Node p = node.predecessor();
            // 只有前驱节点是头节点的时候才能尝试获取锁,同样调用 tryAcquire 方法获取锁
            if (p == head && tryAcquire(arg)) {
                // 获取到锁之后,就将自己设置为头结点(哨兵结点),线程出队列
                setHead(node);
                // 前驱结点(原哨兵结点)的链接置空,由JVM 回收
                p.next = null;
                // 获取锁是否失败改成 false,表示成功获取到了锁
                failed = false;
                // 返回 interrupted,即返回线程是否被中断
                return interrupted;
            }
            // 前驱结点不是头结点或者获取同步状态失败
            // shouldParkAfterFailedAcquire检测线程是否应该被挂起,如果返回true
            // 则调用parkAndCheckInterrupt用于将线程挂起,否则重新开始循环
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                // 到这一步,说明是当前结点(线程)因为被中断而唤醒,那就改变自己的中断标志位状态信息为true。然后又从新开始循环,直到获取到锁,才能返回
                interrupted = true;
        }
    } finally {
        /*如果failed为true,表示获取锁失败,即对应发生异常的情况,
        这里发生异常的情况只有在tryAcquire方法和predecessor方法中可能会抛出异常,此时还没有获得锁,failed=true,那么执行cancelAcquire方法,该方法用于取消该线程获取锁的请求,将该结点的线程状态改为CANCELLED,并尝试移除结点(如果是尾结点)
        另外,在超时等待获取锁的的方法中,如果超过时间没有获取到锁,也会调用该方法
        如果failed为false,表示获取到了锁,那么该方法直接结束,继续往下执行;
        */
        if (failed) 
            // 取消获取锁请求,将当前结点从队列中移除,
            cancelAcquire(node);
    }
}
  • 开启一个死循环,在死循环中进行下面的操作
    -如果当前节点的前驱是 head 节点,那么尝试获取锁,如果获取锁成功,那么当前节点设置为头节点 head,当前节点线程出队,表示当前线程已经获取到了锁,然后返回是否被中断标志,结束循环,进入finally
  • 如果当前节点的前驱不是 head 节点或者尝试获取锁失败,那么判断当前线程是否应该被挂起,如果返回 true,那么调用 parkAndCheckInterrupt() 挂起当前节点的线程,此时不再执行后续的步骤
  • 如果当前线程不应该被挂起,即返回 false,那本次循环结束,继续下一次循环
  • 如果线程被其他线程唤醒,那么判断是否是因为中断而被唤醒并修改标志位,同时继续循环,直到在步骤 2 获得锁,才能跳出循环
  • 最终,线程获得了锁跳出循环,或者发生异常跳出循环,那么会执行 finally 语句块,finally 中判断线程是否是因为发生异常而跳出循环,如果是,那么执行 cancelAcquire() 方法取消该节点获取锁的请求;如果不是,即因为获得锁跳出循环,则 finally 中什么也不干
    在这里插入图片描述

selfInterrupt() 安全中断

根据 !tryAcquire()acquireQueued() 返回值判断是否需要设置中断标志位,只有tryAcquire() 尝试失败,并且 acquireQueued() 方法 true 时,才表示该线程是被中断过了的

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

获取独占式锁 acquire(int arg) 方法总结(重点)

public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
   		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • 调用 AQS 的子类去实现的 tryAcquire() 方法,尝试直接去获取独占式锁,如果成功则直接返回
  • 没成功,则调用方法 addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式
  • acquireQueued() 方法使线程在等待队列中休息,有机会时(轮到自己,会调用unpark())会去尝试获取独占式锁。获取到独占式锁后才返回。如果在整个等待过程中被中断过,则返回 true,否则返回 false
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取独占式锁后才再进行自我中断 selfInterrupt(),将中断补上

在这里插入图片描述

释放独占式锁 release(int arg)

当前线程获取到锁并执行了相应逻辑之后,就需要释放独占式锁,使得后续节点能够继续获取锁。调用 AQSrelease(int arg) 方法可以释放独占式锁

public final boolean release(int arg) {
    // tryRelease() 释放同步状态,该方法是子类重写实现的方法
    // 释放成功将返回 true,否则返回 false 或者自己实现的逻辑
    if (tryRelease(arg)) {
        // 获取头结点
        Node h = head;
        // 如果头结点不为 null 并且状态不等于 0
        if (h != null && h.waitStatus != 0)
            // 那么唤醒头结点的一个出于等待锁状态的后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease(int arg) 尝试释放独占式锁

该方法是需要继承 AQS 的子类去实现的。正常来说,tryRelease() 都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state - = arg),也不需要考虑线程安全的问题。AQS 的子类在实现时,如果已经彻底释放资源(state = 0),要返回 true,否则返回 false

protected boolean tryRelease(int arg) {
	throw new UnsupportedOperationException();
}

unparkSuccessor(Node node) 唤醒后继节点

此方法用于唤醒等待队列中下一个线程

private void unparkSuccessor(Node node) {
    // 这里,node一般为当前线程所在的节点
    int ws = node.waitStatus;
    if (ws < 0)
    	// 如果当前结点的状态小于 0,那么 CAS 设置为 0,表示后继结点线程可以先尝试获取锁,而不是直接挂起
        compareAndSetWaitStatus(node, ws, 0);
	// 找到下一个需要唤醒的节点 s
    Node s = node.next;
    // 如果为空或已取消等待
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 则从tail开始到node之间倒序向前查找,找到离tail最近的非取消结点赋给s
        for (Node t = tail; t != null && t != node; t = t.prev) 
        	// 从这里可以看出,<=0 的结点,都是还有效的结点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
    	// 唤醒
        LockSupport.unpark(s.thread);
}
  • 如果当前结点的状态小于 0,那么 CAS 设置为 0,表示后继节点可以继续尝试获取独占式锁
  • 如果当前节点的后继 snull 或者状态为已取消 CANCELLED 等待,则将 s 先指向 null;然后从 tail 开始到 node 之间倒序向前查找,找到离 tail 最近的非取消节点赋给 s。需要从后向前遍历,因为同步队列只保证结点前驱关系的正确性
  • 如果 s 不为 null,那么状态肯定不是已取消 CANCELLED 等待,则直接唤醒 s 的线程,调用 LockSupport.unpark() 方法唤醒,被唤醒的结点将从被 park 的位置继续执行

释放独占式锁总结

release() 方法是独占模式下线程释放独占式锁的顶层入口。如果彻底释放了(即 state = 0),它会唤醒等待队列里的其他线程来获取独占式锁

在这里插入图片描述

获取共享式锁 acquireShared(int arg)

  • 共享式锁与独占式锁的区别:就是同一时刻是否可以允许有多个线程同时获取到锁
  • 对于共享式锁来说,如果一个线程成功获取了共享式锁,那么其他等待在这个共享锁上的线程就也可以尝试去获取锁,并且极有可能获取成功。基于共享式实现的组件有CountDownLatch、Semaphore
  • 调用 AQSacquireShared() 方法可以获取共享式锁,同样该方法不响应中断
public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)
		// 失败则调用doAcquireShared方法将当前线程封装为Node.SHARED类型的Node 结点后加入到AQS 同步队列的尾部,
        // 然后"自旋"尝试获取同步状态,如果还是获取不到,那么最终使用 park 方法挂起自己
		doAcquireShared(arg);
}

tryAcquireShared(int arg) 尝试获取共享式锁

该方法是 AQS 的子类去自己实现的,用于尝试获取共享锁,一般来说就是对 state 的改变、或者重入锁的检查等等,不同的锁有自己相应的逻辑实现

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

返回 int 类型的值(如返回剩余的 state 状态值-资源数量),一般的理解为

  • 如果返回值小于 0,表示当前线程获取共享锁失败
  • 如果返回值大于 0,表示当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁很可能成功
  • 如果返回值等于 0,表示当前线程获取共享锁成功,但是接下来其他线程尝试获取共享锁会失败。实际上在 AQS 的实际实现中,即使某时刻返回值等于 0,接下来其他线程尝试获取共享锁也可能会成功。即某线程获取锁并且返回值等于 0 之后,马上又有线程释放了锁,导致实际上可获取锁数量大于 0,此时后继还是可以尝试获取锁的

doAcquireShared(int arg) 自旋获取共享锁

每个节点可以尝试获取锁(独占锁或共享锁)的要求是前驱节点是头节点,那么它本身就是整个队列中的第二个节点,每个获得锁的节点都一定是成为过头节点。那么如果某第二个节点因为不满足条件没有获取到共享锁而被挂起,那么即使后续节点满足条件也一定不能获取到共享锁

/**
 * 自旋尝试共享式获取锁,一段时间后可能会挂起
 * 和独占式获取的区别:
 * 1 以共享模式Node.SHARED添加结点
 * 2 获取到锁之后,修改当前的头结点,并将信息传播到后续的结点队列中
 */
private void doAcquireShared(int arg) {
    // addWaiter方法逻辑,和独占式获取的区别 :以共享模式Node.SHARED添加结点
    final Node node = addWaiter(Node.SHARED);
    // 当前线程获取锁失败的标志
    boolean failed = true;
    try {
        // 当前线程的中断标志
        boolean interrupted = false;
        for (; ; ) {
            // 获取前驱结点
            final Node p = node.predecessor();
            // 当前驱节点是头节点的时候就会以共享的方式去尝试获取锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 返回值如果大于等于0,则表示获取到了锁
                if (r >= 0) {
                    // 和独占式获取的区别:修改当前的头结点,根据传播状态判断是否要唤醒后继结点
                    setHeadAndPropagate(node, r);
                    // 释放掉已经获取到锁的前驱结点
                    p.next = null;
                    // 检查设置中断标志
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 判断是否应该挂起,以及挂起的方法,和acquireQueued方法的逻辑完全一致,不会响应中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • 调用 addWaiter() 方法,将当前线程封装为 Node.SHARED 模式的 Node 节点后加入到AQS 同步队列的尾部,即表示共享模式
  • 后面就是类似于 acquireQueued() 方法的逻辑,节点自旋尝试获取共享锁。如果还是获取不到,那么最终使用 park() 方法挂起自己等待被唤醒
    在这里插入图片描述
setHeadAndPropagate(node, r) 设置节点并传播信息
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);//head指向自己
     //如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
  • 设置新 head 节点信息
  • 根据传播状态判断是否要唤醒后继节点
doReleaseShared() 唤醒后继节点

doReleaseShared() 用于在共享模式下唤醒后继节点

/**
 - 共享式获取锁的核心方法,尝试唤醒一个后继线程,被唤醒的线程会尝试获取共享锁,如果成功之后,则又会有可能调用 setHeadAndPropagate,将唤醒传播下去。
 - 独占锁只有在一个线程释放所之后才会唤醒下一个线程,而共享锁在一个线程在获取到锁和释放掉锁锁之后,都可能会调用这个方法唤醒下一个线程
 - 因为在共享锁模式下,锁可以被多个线程所共同持有,既然当前线程已经拿到共享锁了,那么就可以直接通知后继结点来获取锁,而不必等待锁被释放的时候再通知
 */
private void doReleaseShared() {
    // 一个死循环
    for (; ; ) {
        // 获取当前的 head,每次循环读取最新的 head
        Node h = head;
        // 如果h不为null且h不为tail,表示队列至少有两个结点,那么尝试唤醒head后继结点线程
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果头结点的状态为 SIGNAL,那么表示后继结点需要被唤醒
            if (ws == Node.SIGNAL) {
                // 尝试 CAS 设置 h 的状态从 Node.SIGNAL 变成 0
                // 可能存在多线程操作,但是只会有一条成功
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    // 失败的线程结束本次循环,继续下一次循环
                    continue;            
                // 成功的那一条线程会调用unparkSuccessor方法唤醒head的一个没有取消的后继结点
                unparkSuccessor(h);
            }
            /*
             * 如果h状态为0,那说明后继结点线程已经是唤醒状态了或者将会被唤醒,不需要该线程来唤醒
             * 那么尝试设置h状态从0变成PROPAGATE,如果失败则继续下一次循环,此时设置PROPAGATE状态能保证唤醒操作能够传播下去
             * 因为后继结点成为头结点时,在setHeadAndPropagate方法中能够读取到原head结点的PROPAGATE状态<0,从而让它可以尝试唤醒后继结点(如果存在)
             * */
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                // 失败的线程结束本次循环,继续下一次循环
                continue;               
        }
        // 执行到这一步说明在上面的判断中队列可能只有一个结点,或者unparkSuccessor方法调用完毕,或h状态为PROPAGATE(不需要继续唤醒后继)
        if (h == head)                  
            break;
    }
}

释放共享式锁 releaseShared(int arg)

对于支持共享式的同步组件(即多个线程同时访问),它们和独占式的主要区别就是 tryReleaseShared() 方法必须确保锁的释放是线程安全的(因为既然是多个线程能够访问,那么释放的时候也会是多个线程的,就需要保证释放时候的线程安全)。由于 tryReleaseShared() 方法也是我们自己实现的,因此需要我们自己实现线程安全,所以常常采用 CAS 的方式来释放同步状态

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放成功,必定调用doReleaseShared尝试唤醒后继结点
        doReleaseShared(); 
        return true;
    }
    return false;
}

AQS 总结重点

不同的 AQS 同步器争用共享资源的方式也不同。AQS 同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队,唤醒出队等),AQS 已经实现好了AQS 子类同步器实现时主要实现以下几种方法

  • isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它
  • tryAcquire(int):尝试获取独占式锁,成功则返回 true,失败则返回 false
  • tryRelease(int):尝试释放独占式锁,成功则返回 true,失败则返回 false
  • tryAcquireShared(int):尝试获取共享式锁。负数表示获取失败;0 表示成功,但是其他线程尝试获取共享锁会失败;正数表示成功,其他线程尝试获取共享锁很可能成功
  • tryReleaseShared(int):尝试释放共享式锁,如果释放后允许唤醒后续等待节点返回 true,否则返回 false
  • AQS 中有一个核心状态是 waitStatus,这个代表节点的状态,决定了当前节点的后续操作,比如是否等待唤醒,是否要唤醒后继节点

参考:https://www.cnblogs.com/waterystone/p/4920797.html
推荐好文章:https://mp.weixin.qq.com/s/Vz3xLwKpodyU5RN9ERGgbA


版权声明:本文为weixin_38192427原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。