前面讲了什么是AQS.并且基于Reentrantlock已经看了一部分AQS方法. 建议看完下面这篇再来阅读该文.
AQS是什么?基于ReentrantLock解密!
但是其实真实项目中一般不使用ReetrantLock,一般都是synchronized. 因为在jdk1.6之后,synchronized底层实现里面,里面做了一些计数器的维护,加锁释放锁,CAS啊.都能实现ReentrantLock一样的效果.而且使用起来相对简单. 另外读写锁ReentrantReadWriteLock其实也挺常用的
下面就接着看对于ReentrantReadWriteLock底层是如何基于aqs和Lock API进行读写锁的实现的.
前言
- 如果想要几分钟了解AQS是不现实的,你要这样想,人家大神花了几个月甚至几年写的代码怎么会那么容易几分钟让你就看懂呢?
- 本文会根据一步一图的形式,将代码中的各种指针变化全部画出来.便于理解.因为不画图是根本无法了解流程是怎么走的.
- 再讲一遍,该文每个场景都要跟着一步一图走,单看文章可能无法理解,需要结合源码.可以边看边打开idea跟着方法一块儿走.顺便看着我画的图在process on等画图工具里面跟着一起画.理解流程.这样才能更快的理解里面及其复杂的各种指针变化.
- 本文阅读完要20几分钟,但是如果你跟着一起画图,一起看源码,直到看明白,其实是远远不止的.所以做好心理准备,开始吧.
实际场景解读AQS读写锁ReentrantReadWriteLock
LockAPI的读写锁简单介绍
首先明确一点,读锁是共享锁,而写锁是独占锁.
Lock API,读写锁,可以加读锁,也可以加写锁
但是,读锁和写锁是互斥的,也就是说,你加了读锁之后,就不能加写锁;如果加了写锁,就不能加读锁
如果有人在读数据,就不能有人写数据,读锁 -> 写锁 -> 互斥
如果有人在写数据,别人不能写数据,写锁 -> 写锁 -> 互斥;
如果有人在写数据,别人也不能读数据,写锁 -> 读锁 > 互斥
但是如果有人加了读锁之后,别人可以同时加读锁
如果你有一份数据,有人读,有人写,如果你全部都是用synchronized的话,会导致如果多个人读,也是要串行化,一个接一个的读
我们希望的效果是多个人可以同时来读,如果使用读锁和写锁分开的方式,就可以让多个人来读数据,多个人可以同时加读锁
读写锁的好处
- 读写锁维护了一对锁,一个读锁和一个写锁,通过分离读写锁,使得并发性比一般的排它锁有了很大提升。
- synchronzied特性以及功能,读写锁都拥有
- 但是因为大多数应用场景都是读多于写的,因此在这样的情况下,读写锁可以提高吞吐量。这是相对于synchronized好的地方.
线程1写锁如何基于AQS的state完成加锁的?
- 我们走进writeLock的lock方法中.可以看到下面方法.首先调用tryAcquire方法.获取到了当前线程1.会判断是否有其他线程抢到过锁
acquire方法如下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 获取到一个state = 0
int c = getState();
// 二进制值里面的高低16位分别代表了读锁和写锁,AQS就一个,state
// state二进制值的高16位代表了读锁,低16位代表了写锁
// 可以认为下面的w就是从c(二进制值)通过位运算获取到了state的低16位,代表了写锁的状态
int w = exclusiveCount(c);
// 如果c != 0,说明有人加过锁,但是此时c = 0
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// 非公平锁,此时一定会去尝试加锁
// 如果是公平锁,此时会判断如果队列中有等待线程,就不加锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
- 上面的c也就是state第一次进去是为0的.证明没有其他线程抢到锁,所以会走第二个if语句.进行writerShouldBlock判断是否公平,
- 如果是非公平策略.writerShouldBlock会返回false,此时一定会去执行compareAndSetState(c, c + acquires) 尝试加锁,state+1
- 如果是公平策略.writerShouldBlock则会判断是否阻塞队列里面有元素在排队,没有返回false,执行cas,state+1,有就返回true 直接跳过if
- 然后会设置当前加锁线程为线程1.
总结
当第一个线程过来尝试加锁的时候,会cas成功后对state加1,将当前加锁线程存进变量ExclusiveOwnerThread中
线程1state二进制高低16位判断写锁可重入
其实说白了就是将state的值在加1,比如下面线程1的state本来加过一次锁.再加一次就会变为2.
代码分析
下面看代码,其实还是tryAcquire方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState(); //获取到state此时为1,
int w = exclusiveCount(c);//w变量代表write写锁的第十六位二进制数,exclusiveCount获取的低十六位的值此时也是不为0的.
if (c != 0) {
//虽然c!=0,w!=0,但是因为当前加锁的线程还是线程1.所以if中的第二个条件不满足
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//这里会判断加写锁数量是否大于最大数量65535.如果超过就抛异常. 不满足条件.
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//这里会重新设置state的值为2.然后返回true
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
总结
- 如果是int类型的state不是0的话,那么他的二进制数值,32位,低16位一定不是0,如果低16位不是0的话,就代表他是加过写锁的
c != 0,w == 0,c肯定不是0,但是低16位是0,说明高16位不为0, 此时有线程加了读锁,没有线程加写锁,此时线程1要加写锁,而且线程1还不是之前加锁的那个线程
c != 0,w != 0,有线程加过锁,之前加的是写锁,但是当前线程不是之前加锁的线程,此时也不让线程1加写锁,同一个时间,只能有一个线程加写锁,如果线程1比如加了写锁,线程2也要加写锁,是互斥的
c != 0,w != 0,之前有人加过写锁,而且加写锁的还是线程1
如果加写锁的人是线程1,说明线程1就是在可重入的加写锁,将state += 1
线程2写锁加锁失败如何基于CAS队列阻塞等待?
当tryAcquire返回false的时候证明此时加锁的不是线程1,而是线程2了.此时就要进行阻塞线程2了.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter方法如下
- 这里流程其实就跟ReentrantLock中分析的时候一样.就是各种指针变化.
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 我们一步步分析,首先基于当前抢锁失败线程创建一个node.然后pred指针指向tail.此时tail为null,所以pred也为null
enq方法如下
- 然后进入enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 首先t也指向tail.然后tail为null,那么t也为null,然后就进行初始化虚拟Node设置为一个Head节点,并且将tail指针指向nodeHead节点.
- 然后二次循环,此时t重新指向tail,然后判断不为null,就进入else,将线程2node的prev指向NodeHead,然后tail节点设置成线程2node,nodeHead的next指针指向线程2node,形成回路,最后返回nodeHead如下图.
acquireQueued方法
- enq执行完之后addWaiter方法返回线程2node
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 也就是acquireQueued(线程2node)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//这里tryAcquire不报错就不会执行.暂时不管,想了解cancelAcquire的话建议看ReentrantLock,那个文档有写
if (failed)
cancelAcquire(node);
}
}
- 首先获取node前驱节点,这里线程2node的前驱节点就是nodeHead,所以第一个if中虽然p==head,但是我们这里假设再次执行tryAcquire还是返回false,那么就会继续走shouldParkAfterFailedAcquire(nodeHead,线程2Node)方法了.
shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 首先int ws = pred.waitStatus;会获取到nodeHead的waitStatus状态.因为waitStatus默认创建node时被初始化为0.所以不满足第一个if
- 进入else ,cas操作将ws设置为SIGNAL 也就是-1. 如下图.
- acquireQueued方法接着继续for循环,重新获取线程2node前驱节点nodeHead,然后tryAcquire获取锁.假设继续获取锁失败.就会又走shouldParkAfterFailedAcquire方法,此时判断ws为-1,直接返回true.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
//省略
return false;
}
parkAndCheckInterrupt方法
- if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 语句继续执行parkAndCheckInterrupt方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//这里阻塞住线程2
return Thread.interrupted();
}
总结
最终线程2会加入阻塞队列中,成为队头.并被park阻塞住,
线程3基于AQS的state二进制高低16位完成互斥判断–读写锁互斥
假设线程3过来加了读锁
下面看读锁的加锁过程
readLock.lock();
public void lock() {
sync.acquireShared(1);
}
acquireShared方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
内部首先获取tryAcquireShared(arg)是否小于0
tryAcquireShared方法
- 进行tryAcquireShared方法进行尝试cas之前判断已经加过写锁,此时就对即将加的读锁互斥.
//AQS父类
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//子类SYNC重写
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread(); //获取当前线程
int c = getState();//获取state变量值c.此时c等于2----被线程1可重入了两次.
//exclusiveCount(c)获取state低16位的二进制数.此时也是不等于0的.
//但是当前线程过来的是线程3,跟加锁线程是不相等的.此时这里就知道上面的读锁过来要进行阻塞.所以这里直接进入if返回 -1 .
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
- 上面互斥最终会返回-1 .是满足acquireShared方法内部的if语句的.
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
- 此时会直接走doAcquireShared(arg)方法
doAcquireShared方法
- 其实该方法是AQS内部方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先调用addWaiter(Node.SHARED)方法,Node.SHARED代表共享模式,一般加读锁是设置.我们下面再走一下addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 首先进入第一个if语句.将线程2node和新加的线程3node链接起来,线程3node变为tail指针. 然后就直接返回node了.
- 继续走doAcquireShared方法,进入for循环,这里跟acquireQueued方法很像.
for (;;) {
final Node p = node.predecessor();//获取线程3的前驱节点,此时是线程2Node
if (p == head) {//不满足条件
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}//进入shouldParkAfterFailedAcquire方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
- shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 继续一步一图.应该知道shouldParkAfterFailedAcquire的作用在lock的时候其实就是将前驱节点的waitStatus变为SIGNAL = -1 ,上面shouldParkAfterFailedAcquire会被for循环执行两次.如下图
- 然后就会执行parkAndCheckInterrupt方法,将线程3也阻塞
总结
根据互斥原则,当已经加过写锁,是不能够加读锁的.最终线程3会被标记为SHARED共享状态,添加到阻塞队列中去.等待被唤醒.
释放线程1写锁对AQS队列唤醒阻塞线程2的过程
- 释放写锁可重入加锁释放时会多次释放.直到state变为0.
release和tryRelease方法
public final boolean release(int arg) {
//tryRelease方法第一次在线程1进行写锁释放时会返回1.因为原先是可重入了一次是2. 所以想要进入循环,得等线程1内部的重入锁也释放.也就是下面的tryRelease还要再执行一遍,将state变为0.
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
- 最终会进入if语句去真正的释放锁,我们知道,head是nodeHead.其waitStatus是-1,满足下面条件,会直接进行unparkSuccessor调用
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}p
unparkSuccessor方法
- 下面方法主要作用是将park住的线程1 unpark
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//暂时进不来.
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//将nodeHead头结点的下一个节点也就是线程2node给唤醒.
LockSupport.unpark(s.thread);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
- 一旦执行了unpark,那线程2在原先acquireQueued的第二个if中的parkAndCheckInterrupt就会被打断.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//这里tryAcquire不报错就不会执行.暂时不管,想了解cancelAcquire的话建议看ReentrantLock,那个文档有写
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//这里阻塞住线程2
return Thread.interrupted();//这里会被清除中断标记.并返回true
}
将interrupted赋值为true之后,会重新走一遍for循环.获取node的前驱节点p也就是nodeHead.然后判断p等于head.执行tryAcquire中的cas去进行抢占锁.此时假设没有其他线程非公平的过来抢占锁
此时tryAcquire中cas成功返回false.对state+1.加锁线程设置为线程2.然后进入acquireQueued的if中
setHead(node);
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
p.next = null; // help GC
failed = false;
return interrupted;
- 将head节点指向线程2node.并将线程2node内部线程清空,然后将prev指针断开
- 然后线程2node的前驱结点的next也置为null.就实现了彻底将原先的nodeHead给断开. 在jvm对GCROOT重新标记后,就会发现nodehead对象被孤立,就会被 回收掉.
- acquireQueued就会返回true ,就会执行acquire方法的selfInterrupt再次对线程2node添加一下中断标记.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
总结
- 看上图可知,现场2node被唤醒,变为新的虚拟头结点,然后原先的nodeHead头结点会被垃圾回收掉.
基于CAS实现同时只有一个线程可以加读锁
假设先释放线程2的锁,唤醒线程3
首先查看线程2写锁的释放过程
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
doAcquireShared方法内唤醒线程
- 上面释放过程跟上面释放线程1一样.唯一的区别应该是线程3的唤醒过程,如下
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
....
}
}
- 这里parkAndCheckInterrupt方法最终会被中断并被清除中断标记,然后重新开始循环.此时p节点就是头结点.因为写锁没有了.线程3的前驱结点就是nodehead,
tryAcquireShared方法内部cas
- 假设tryAcquireShared下面没有其他线程抢占锁.那么下面代码返回1.
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();//
int c = getState();//当前state为0
if (exclusiveCount(c) != 0 &&低16位没有值,没有写锁
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);//获取r为0.
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//将state的高16为加1.
if (r == 0) {//满足条件
firstReader = current; //记录firstReader为第一个读线程---线程3
firstReaderHoldCount = 1; //firstReaderHoldCount赋值为1.证明一个读锁.
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;//这里直接跳出,返回1.
}
return fullTryAcquireShared(current);
}
- 然后因为r>0,所以进入if语句中,
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below//会获取头结点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
- 上面逻辑主要是将线程3从队列移除,线程3node则变为虚拟头节点.原nodeHead则被垃圾回收.至此队列就只剩下一个虚拟节点.并且锁占用线程则变成了线程3.如下图.
总结:
- 目前队列里面只有一个虚拟节点.并且当前占用的firstReader读锁线程是线程3.state高16位的值也记录下来是1.
此时线程4或5同时过来加读锁会怎么样呢?记录读锁数量或自旋重试加读锁
- 还是走到加读锁的lock代码,最终会调用tryAcquireShared方法
tryAcquireShared方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();//假设这里获取到的state为1,线程2也释放了锁,并且此时线程3被唤醒.也就是说当前加锁线程其实是线程3. 阻塞队列里面只有一个空的虚拟节点.
//exclusiveCount(c)此时获取写锁的二进制数其实是为0的.所以条件不满足
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//如果有线程持有独占锁也就是写锁,就不让cas获取锁.直接返回-1.
return -1;
//sharedCount获取的是加了SHARED标记状态的线程节点记录的state高16位的二进制数值,代表了读锁的加锁次数
int r = sharedCount(c);//这里获取的r肯定是不等于0的.因为被线程3赋值了.
//readerShouldBlock在非公平情况下返回的是false.满足条件.然后进行设置state的高十六位二进制数加1.
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//如果线程4过来进行加读锁.并且cas成功,则进入内部,此事state数量为2
if (r == 0) {//不满足
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//不满足
firstReaderHoldCount++;//读锁可重入计数
} else {
//获取一个HoldCounter,用于记录读锁的加锁数量,内部维护了一个count变量.用于计数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
//第一次是满足的.所以这里获取到readHolds内部维护的ThreadLocalHoldCounter 该内部类其实是一个ThreadLock类.
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//该代码是在多个线程同时加读锁进行cas的时候
//也就是上面compareAndSetState(c, c + SHARED_UNIT)只有一个线程成功,后面的线程都会走下面代码进行自旋尝试
return fullTryAcquireShared(current);
}
//SYNC内部类
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
//SYNC内部类
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter(); //初始化的时候创建了HoldCounter.
}
}
Sync() {
readHolds = new ThreadLocalHoldCounter();//Sync构造的时候创建了readHolds
setState(getState()); // ensures visibility of readHolds
}
- 我们假设此时线程4和线程5同时过来加读锁.此时,首先会执行以下几步操作
- 首先获取sharedCount,哪些加了SHARED标记状态的线程节点记录的state高16位的二进制数值,代表了读锁的加锁次数,此时r是1,从上图可知只有一个线程3占用着一个读锁.
- 然后readerShouldBlock会在不公平策略情况下返回false.然后就会对线程4或5进行cas操作.此时只有一个线程会cas成功,我们假设是线程4
readerShouldBlock代码
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
- 进入else代码对sync的内部一些变量如cachedHoldCounter自定义内部类(内部维护一个计数器count),readHolds(threadlocal).用于记录当前读锁的个数.变量在SYNC构造的时候进行创建赋值初始化,如上代码.
readHolds解释:
当前线程持有的可重入读锁的数量。 仅在构造函数和readObject中初始化。 当线程的读取保持计数下降到0时删除。
cachedHoldCounter解释:
成功获取readLock的最后一个线程的保持计数。这样可以在常见情况下保存ThreadLocal查找,在这种情况下,要释放的下一个线程是要获取的最后一个线程。这是非易失性的,因为它仅用作*启发式,对于缓存线程非常有用。
流程图如下
fullTryAcquireShared方法
- 第2,3步主要用于存储读锁计数器在当前线程4的threadLocal里面.而线程5在cas失败的情况下会进入到fullTryAcquireShared方法中,如下:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();//重新获取state数量
if (exclusiveCount(c) != 0) {//因为上面场景没有写锁,而新加的线程5是读锁.所以这里写锁数量为0,不满足
...
} else if (readerShouldBlock()) {//readerShouldBlock在非公平锁情况下返回false
....
}
if (sharedCount(c) == MAX_COUNT)
....
if (compareAndSetState(c, c + SHARED_UNIT)) {//重新进行cas重试.重试成功就进入下面否则继续循环再次cas,直到cas成功
if (sharedCount(c) == 0) {
..
} else if (firstReader == current) {
firstReaderHoldCount++;//这是读锁的可重入计数
} else {
//第一次判断rh为null,赋值为cachedHoldCounter计数器
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; //计数++
cachedHoldCounter = rh; // 缓存更新
}
//直接返回线程5加锁成功
return 1;
}
}
}
整体流程图如下:
线程6过来加写锁(独占锁)失败如何入阻塞队列?
writeLock.lock方法进入
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法进行读锁互斥写锁
下面方法上面分析过了,就不再赘述,一句话就是被判断已经加了读锁.直接返回false
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {//代表有锁已经加过了.
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())//w==0代表写锁为空,读锁不为空.此时互斥要返回false
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)阻塞入队
代码再贴一遍
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
上面代码流程如下
然后acquireQueued(线程6node, 1)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//signal=-1
return true;
if (ws > 0) {
...
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
继续进行流程图. 其实就是在将阻塞列表的头结点的等待状态设置为阻塞中也就是-1,然后在头结点后面再拼一个node节点,然后阻塞住.
那如何唤醒阻塞队列的写锁线程6呢?
其实上面已经讲过如何唤醒写锁了.
- 这里只画一下图.理解一些就行了.其实就是将线程6cas成功后将阻塞队列线程6node节点从队列移除,也就是设置成新的nodeHead头结点
然后unpark中断线程6阻塞状态
为什么上面讲过如何唤醒写锁还要再提呢?
主要是有下面几个疑问要知道.必须了解上面如何唤醒写锁等等流程才能总结下面的疑问.
- 首先我们要思考一下,如果前面线程4和线程5已经加过共享锁(读锁)了.此时又加了线程6或者更多线程的写锁在阻塞队列里面
- 那么如果线程3的读锁释放掉. 同时一大堆读锁线程在cas还想加读锁.那么阻塞队列里面的写锁线程跟非公平竞争的一大堆外部读锁谁先抢到锁呢
- 是先唤醒阻塞队列的写锁呢?还是先cas加读锁呢?
这个问题通过上面的分析,可以仔细思考一下.
如果我们先唤醒阻塞队列的写锁的话.相当于所有一大堆的读锁线程都要在fullTryAcquireShared方法中进行自旋重试. 不就出现了读锁饥饿?
异或是先读锁线程cas优先,而aqs队列的写锁继续阻塞.又出现了写锁饥饿了.?
答案:其实上面已经给出答案了,aqs是有公平和非公平的策略的.
- 当线程已经占有独占锁也就是写锁的时候,此时任何锁类型过来,都要进行排队.或者又来的线程还是自己,就重入加锁.
- 当线程是加共享锁也就是读锁的时候, 通过readerShouldBlock来判断,此时分公平和非公平策略
当是公平抢锁的时候,每次进行尝试获取锁的时候都要进行判断是否有线程在等待.有的话就排队
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
当是非公平的时候,判断头结点的下一个节点是否非共享锁(读锁),也就是必须是写锁,才能返回true.如果是满足队列第一个节点是写锁,就进行cas自旋重试排队. 如果队列第一个节点是读锁.可以重入,还是唤醒后cas获取共享锁.
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
总结
- 通过上面的场景分析,基本了解了AQS在读写锁ReentrantReadWriteLock中是如何使用了.
- 了解了其优缺点.
- 了解了各个场景下AQS 写锁内部阻塞与唤醒,读锁可共享可重入以及阻塞与唤醒的流程.
- 解惑了锁饥饿场景下的解决方式—区分独占锁还是共享锁,公平还是非公平.
- 不管你是从上往下一步一图跟着画,跟着看源码看完的,还是直接鼠标直接划到底的,你都了解了其实要想完全了解AQS和ReentrantReadWriteLock 光靠简单的语言是不可能详细的完全理解各种场景的.必须要很大的篇幅才能讲清楚.所以能看到这里还是恭喜你.
- 不管是不是白嫖,来个三联吧.啊哈哈.毕竟花了两三天熬夜写出来的,还是很有成就的.
- 讲点心里话,想要学好,真的要静下心来.慢慢画图,当自己就是写代码的人.沉浸其中.掌握画图技巧,刚开始也不会画,各种指针.各种饶,但是尝试总有收获.感觉再复杂的算法通过画图都可以看明白.理解明白.
- 可以看看我下面画图跟着源码每个场景都画一副甚至多副图,这样就有了步骤性.记录性.
- 总之,画图就是明灯,照亮你的源码之路.有任何疑问可以留言问我.另
- 真的是最后一句: ----------------------------------语雀真好用