目录
writerShouldBlock()和readerShouldBlock()分析:
ReentrantReadWriteLock结构介绍:
实现了ReadWriteLock接口,ReadWriteLock.readLock()需要返回读锁,ReadWriteLock.writeLock()需要返回写锁
- 内部类:Sync(又分为FailSync和NonFairSync) ,ReadLock(里面又有一个Sync类变量), WriteLock(里面又有一个Sync类变量)
- Sync继承自AQS:重写了ReentrantReadWriteLock:tryAcquireShared(),tryReleaseShared()...
- ReadLock和WriteLock:实现了Lock接口
- 全局变量:sync,readLock(调用readLock()时供返回),writeLock(调用writeLock()时供返回)
- 默认构造方法:将全局变量sync=new NonfairSync(); 同时将sync赋值给readLock.sync和writeLock.sync
总结:ReentrantReadWriteLock维护了一把读锁,一把写锁,均实现了Lock接口,由内部类Sync用这俩把锁来实现线程安全。至于这俩把锁的公平性由构造方法时传入,默认是非公平。
AQS如何用一个Int值来表示读和写两种状态:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }一个int的值是32位
AQS将这32位分为高16位与低16位两个无符号的short值。高16位用来表示读 锁(共享锁),低16位用来表示写锁(独占锁)
MAX_COUNT为1左移16位减一,也就是二进制为00000000000000010000000000000000转换成十进制为65536再减去1位65535。最大共享数和最大独占数均为65535:
读锁最大共享数:65535 :2^16=65536( 0-65535)
写锁最大的重入次数65535 :2^16=65536( 0-65535)
AQS获取读锁的个数:sharedCount(int state): 将state右移16位(高位补0)
AQS获取写锁的个数:exclusiveCount(int state):state&65535,原理如下(只有1&1=1 其他&均为0)
| 写锁(独占锁) | |
|---|---|
| 0000000000000001 | 0000000000000010 |
| 0000000000000000 | 111111111111111111 |
| 0000000000000000 | 0000000000000010 |
AQS在处理读锁与写锁的方式很巧妙,永远把高位的16位变成0,从而消除高位的影响。这里是非常值得借鉴的地方。
AQS对每个读线程的重入计数:
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 每个线程特定的 read 持有计数。存放在ThreadLocal,不需要是线程安全的。
*/
static final class HoldCounter {
int count = 0;
final long tid = Thread.currentThread().getId();
}
/**
* 采用继承是为了重写 initialValue 方法,
* 如果ThreadLocal没有当前线程的计数,则new一个,再放进ThreadLocal里。
* 可以直接调用 get。
* */
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* 保存当前线程重入读锁的次数的容器。在读锁重入次数为 0 时移除。
*/
private transient ThreadLocalHoldCounter readHolds;
/**
* 最近一个成功获取读锁的线程的计数。这省却了ThreadLocal查找,
* 通常情况下
*/
private transient HoldCounter cachedHoldCounter;
/**
* firstReader是这样一个特殊线程:它是第一个把 共享计数 从 0 改为 1 的
* (在锁空闲的时候)
* firstReaderHoldCount 是 firstReader 的重入计数。
* 作用是在跟踪无竞争的读锁计数时非常便宜。
*
* firstReader及其计数firstReaderHoldCount是不会放入 readHolds 的。
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // 确保 readHolds 的内存可见性,利用 volatile 写的内存语义。
}
}writerShouldBlock()和readerShouldBlock()分析:
writerShouldBlock()和readerShouldBlock()方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。
公平模式:只要队列前面有等待线程。读写都应被阻塞
非公平模式:writerShouldBlock永远不会被阻塞 (是为了避免想获取写锁的线程饥饿,老是得不到执行的机会)
readShouldBlock如果等待队列中第一个等待线程想获取写锁,则阻塞
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}重入读写锁总概述:
- 获取写锁时,只有所有线程都没在读时才有可能去获取到写锁
- 获取读锁时,只有没有写线程时才有可能获取到读锁
- 全局只有一个线程可以获取到写锁,并且获取写锁的线程可重入获取写锁,能获取的写锁最大个数为65535,这个获取写锁的线程也可以获取读锁(称为锁降级,如果不降级则会导致死锁)
- 获取不到读锁或写锁的线程都会被阻塞到一个阻塞队列,这个阻塞队列里的所有线程都只是第一次尝试获取读锁或写锁失败(不可能是重入的读锁,重入的读锁不阻塞,继续获取锁直到获取成功,具体过程参考下面读锁的获取)
- 这个阻塞队列的节点什么时候被唤醒,情况如下:
- 每个读锁获取后,如果下一个节点是共享节点,则释放下一个节点
- 每个读锁释放后,如果现在全局没有读也没有写,则会释放阻塞队列头节点
- 每个写锁释放后,如果此时全局已经没有写锁了,则会释放阻塞队列的头结点
- 只有每个写锁获取时,直接state+1,然后不会管阻塞队列的事
读锁获取:
读锁获取过程总结:获取读锁时有以下3种情况会获取失败:
- 有正在写的线程则会获取失败
- 非重入读应阻塞(重入即使读应阻塞也会继续自旋尝试获取读锁)
- 读锁达到最大共享数
每个读线程获取读锁成功后,都会判断阻塞队列中下一个节点是否是读节点,如果是则会被释放
获取读锁失败的线程会加入阻塞队列,加入阻塞队列如果是头节点的下一个节点(这个时候由于多线程并发,这个新节点可能后面已经有好多被阻塞的节点了),则有机会再次尝试获取读锁,一旦获取读锁成功的后,state+65536,并会有一个向后的传递性。如果其下一个节点是共享节点,则会继续释放下一个节点,下一个节点释放并获取读锁成功后又会判断下一个节点是否是共享节点,如果是继续释放。。。。(不是共享节点不释放,因为释放了这个写线程也不会获取到锁,因为现在有读线程)
AQS对每个读线程的都有个读计数器,记录读线程id及其占有的读锁数量,其中firstReader和firstReaderCount是第一个线程计数器,而cachedHoldCounter永远保存的是最后一个读线程的计数,至于每个读线程通过线程局部变量 readHolds.get()获取对应的计数器
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.LockSupport;
public class Temp {
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1; //如果有线程持有写锁,并且不是当前要获取读锁的线程,直接返回获取读锁失败
//如果是当前要获取读锁的线程,则会继续获取读锁,这称为锁降级:写线程里可以获取读锁
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {//对第一个读线程做记录,记录线程及其获取读锁的个数
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//第一个读线程重入获取读锁,直接将其获取的读锁个数+1
firstReaderHoldCount++;
} else {
/**
* 不是第一个读线程,cachedHoldCounter里永远保存最后一个读线程的计数器
* 初始rh为null,则会调用readHolds.get()初始化一个线程计数器对当前线程做记录
* 如果rh!=null,则rh为最后一个读线程的计数器,如果rh的线程不是当前要获取读锁的线程,
* 则必定是中间的 某个读线程(因为不是第一个也不是最后一个)
* 则利用线程局部变量readHolds获取当前线程对应的计数器并+1(并赋值给cachedHoldCounter,表示最后一个读线程)
*/
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
/**
* 下面的处理是说,如果是已获取读锁的线程重入读锁时,即使公平策略指示应当阻塞也不会阻塞。
* 否则,这也会导致死锁的,原因如下:
* 如果要是加入阻塞队列,而阻塞队列的前面节点如果又有要获取写锁的线程,那么当释放要获取写锁线程这个节点时,
* 这个写线程要想获取写锁,则必须所有人都没在读,而刚加入阻塞队列的节点不符合此条件导致写线程不能获取到写锁,
* 而后面的节点也永远得不到释放
*/
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {//获取当前读线程的计数器
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)//rh:当前读线程的计数器,如果rh.count=0代表该线程第一次尝试获取读锁
//如果rh.count!=0,代表不是第一次尝试获取读锁,之前已获取过,该线程是重入获取读锁
//则不会被阻塞,会继续循环获取读锁
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
}
//如果获取读锁失败,则加入阻塞队列
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
/**
* 如果获取到了,则设置新的头并且去解除队列后面的一个读线程的阻塞
* 如果这个读线程被解除阻塞获取到锁后,
* 会解除后面一个读线程,就这样依次解除,直到遇到写线程则不继续解除了
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
AbstractQueuedSynchronizer.Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}读锁释放:
读锁释放过程总结:读锁释放就是直接state-65536,并且将其对应的读计数器中记录的占有的读锁个数-1
只不过释放读锁成功后,如果当前state=0也就是没有读也没有写线程时,如果有阻塞队列则会从头节点h的下一个节点开始释放,而被释放出来的节点如果是读线程,那这个读线程获取读锁成功后又会继续判断下一个节点是否是读节点,如果是继续释放,如果被释放出来的节点是写线程,则会尝试获取写锁,获取成功state+1,获取不成功继续被阻塞
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//读锁和写锁都空闲时为真
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;//代表读锁和写锁都空闲
}
}
private void doReleaseShared() {//和读锁获取成功后一样释放后面的节点
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}写锁获取:
写锁获取和重入锁基本一致,全局只有一个线程可以获取到写锁,state+1,并且可以重入获取写锁,这个写线程也可以获取读锁(锁降级)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())//已经有写线程,且不是自己,则直接获取失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)//写锁数量已经达到最大,则获取写锁失败
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}写锁释放:
写锁释放:一般写锁释放直接state-1,然后啥也不干,但是如果本次写锁释放后全局已经没有写锁了,则会去释放阻塞队列的头结点
如果是读节点,后面则是上面的读锁获取的系列过程,如果是写线程则是上面的写锁获取的系列过程
public final boolean release(int arg) {
if (tryRelease(arg)) {//全局没有写时,则会释放头结点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;//全局没有写时则为真
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}