AQS底层理解分析

Java中的锁

在Java中,多线程的情况下需要锁来保证数据的安全,锁一般分为两类:sychronized 和 Lock。
• sychronized利用的是指令级别的monitor-enter 和 monitor-exit。
• Lock 使用的则是代码级别实现的。在Doug Lea大神的操刀下利用CAS + 自旋 + volatile变量实现。
而在实现之后,并且抽象出了一个实现锁的基础类AbstractQueuedSynchronizer,通过这个类可以快速的实现符合自己要求的锁。

在这里插入图片描述

AQS的内部实现

Java并发编程的核心在于java.util.concurrent包。而juc当中大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer,简称AQS。AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。
AQS具备特性:
• 阻塞等待队列
• 共享/独占
• 公平/非公平
• 可重入
• 允许中断
这些特性是怎么实现的,以ReentrantLock为例:
• 一般通过定义内部类Sync [sɪŋk] 继承AQS
• 将同步器所有调用都映射到Sync对应的方法

AQS框架 - 管理状态

AQS内部维护属性:volatile int state(32位)
state表示资源的可用状态
state三种访问方式
getState()setState()compareAndSetState()
AQS定义两种资源共享方式
Exclusive 独占,只有一个线程能执行,如ReetrantLock
Share 共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
● 同步等待队列
● 条件等待队列

Node类介绍 重要属性

static final class Node {
    //共享模式,资源可以同时去拿
    static final Node SHARED = new Node();
    //独占模式,只能有一个线程去拿
    static final Node EXCLUSIVE = null;

   //表示当前线程被中断了,在队列中没有任何意义,可以被剔除了
    static final int CANCELLED =  1;
    /**
     * 后继节点的线程处于等待状态,而当前节点如果释放了同步状态或者被取消,
     * 将会通知后继节点,使后继节点得以运行
     */
    static final int SIGNAL    = -1;

    /**
     * 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,
     * 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次共享方式同步状态获取将会被无条件的传播下去
     */
    static final int PROPAGATE = -3;

    /**
     * 标记当前节点的信号量状态(1,0,-1,-2,-3)5种状态
     * 使用CAS更改状态,volatile保证线程可见性,并发场景下,
     * 即被一个线程修改后,状态会立马让其他线程可见
     */
    volatile int waitStatus;

    /**
     * 前驱节点,当前节点加入到同步队列中被设置
     */
    volatile Node prev;

    /**
     * 后继节点
     */
    volatile Node next;

    /**
     * 节点同步状态的线程
     */
    volatile Thread thread;

    /**
     * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量
     * 也就是说节点类型(独占和共享)和等待队列中的后继节点公用一个字段
     * (用在条件队列里面)
     */
    Node nextWaiter;
    }

ReentrantLock

同步队列
在这里插入图片描述

加锁解锁过程
在这里插入图片描述

tryAcquire()

final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
        if (compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }

先判断state是否为0,如果为0就执行上面提到的lock方法的前半部分,通过CAS操作将state的值从0变为1,否则判断当前线程是否为exclusiveOwnerThread,然后把state++,也就是重入锁的体现,我们注意前半部分是通过CAS来保证同步,后半部分并没有同步的体现,原因是:后半部分是线程重入,再次获得锁时才触发的操作,此时当前线程拥有锁,所以对ReentrantLock的属性操作是无需加锁的。如果tryAcquire()获取失败,则要执行addWaiter()向等待队列中添加一个独占模式的节点。

addWaiter()

/**
   * Creates and enqueues node for current thread and given mode.
   *
   * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
   * @return the new node
   */
  private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
      }
    }
    enq(node);
    return node;
  }

这个方法的注释:创建一个入队node为当前线程,Node.EXCLUSIVE 是独占锁, Node.SHARED 是共享锁。
先找到等待队列的tail节点pred,如果pred!=null,就把当前线程添加到pred后面进入等待队列,如果不存在tail节点执行enq()

enq

private Node enq(final Node node) {
    for (;;) {
      Node t = tail;
      if (t == null) { // Must initialize
        if (compareAndSetHead(new Node()))
          tail = head;
      } else {
        node.prev = t;
        if (compareAndSetTail(t, node)) {
          t.next = node;
          return t;
        }
      }
    }
  }

这里进行了循环,如果此时存在了tail就执行同上一步骤的添加队尾操作,如果依然不存在,就把当前线程作为head结点。
插入节点后,调用acquireQueued()进行阻塞

acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  }

先获取当前节点的前一节点p,如果p是head的话就再进行一次tryAcquire(arg)操作,如果成功就返回,否则就执行shouldParkAfterFailedAcquire、parkAndCheckInterrupt来达到阻塞效果;

Thread Interrupt

在这里插入图片描述


版权声明:本文为weixin_44110695原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。