java锁(6)读写锁ReentrantReadWriteLock实现详解

1、ReentrantReadWriteLock特性

1.1、公平性

  • 非公平锁(默认):这个和ReentrantLock的非公平性一样,由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁。
  • 公平锁利用AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,前提是写线程的等待时间要比所有读线程的等待时间要长。同样一个线程持有写入锁或者有一个写线程已经在等待了,那么试图获取公平锁的所有线程(包括读写线程)都将被阻塞,直到最先的写线程释放锁。如果读线程的等待时间比写线程的等待时间还有长,那么一旦上一个写线程释放锁,这一组读线程将获取锁。

1.2、重入性

  • 读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁。只有写线程释放了锁,读线程才能获取重入锁。
  • 写线程获取写入锁后可以再次获取读取锁,但是读线程获取读取锁后却不能获取写入锁。
  • 另外读写锁最多支持65535个递归写入锁和65535个递归读取锁。(源码中使用了一个int类型的数据来记录,高16位记录读锁的个数,低16位记录写锁的个数)

1.3、 锁降级

写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性。

1.4、锁升级

读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁试图获取写入锁而都不释放读取锁时就会发生死锁。

1.5、锁获取中断

读取锁和写入锁都支持获取锁期间被中断。这个和独占锁一致。

1.6、条件变量

写入锁提供了条件变量(Condition)的支持,这个和独占锁一致,但是读取锁却不允许获取条件变量,将得到一个UnsupportedOper ationException异常。

1.7、重入数

读取锁和写入锁的数量最大分别只能是65535(包括重入数)。

1.8、监测

ReentrantReadWriteLock支持一些确定是保持锁还是争用锁的方法。这些方法设计用于监视系统状态,而不是同步控制。

2、ReentrantReadWriteLock实现原理

2.1、ReentrantReadWriteLock继承及聚合关系

以下为ReentrantReadWriteLock的继承及聚合实现关系。

9795603-2d3a7d27fcb19efb.png
ReentrantReadWriteLock继承及聚合关系.png

ReentrantReadWriteLock继承ReadWriteLock接口,读写锁实现是通过ReadLock及WriteLock实现的。而ReadLock及WriteLock通过聚合实现读写锁功能,而底层是通过AQS实现锁功能。

2.2、同步锁Sync实现

Sync继承于AQS,其实现了获取独占锁及共享锁相关接口,同时通过锁状态保存锁的读计数及写计数。

//同步锁实现,继承于AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    //将共享锁和独占锁的加锁次数保存在一个int类型中,
    //其中高16位代表共享锁的占用计数,低16位代表独占锁的占用计数
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //共享锁的占用计数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //独占锁的占用计数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    //锁的计数
    static final class HoldCounter {
        int count = 0;
        // 线程id
        final long tid = getThreadId(Thread.currentThread());
    }

    //通过ThreadLocal方式保存线程锁计数
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    //保存线程读写计数数据
    private transient ThreadLocalHoldCounter readHolds;

    //保存第一个获取读锁的线程及锁计数
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }

   //读线程应该被阻塞
    abstract boolean readerShouldBlock();

   //写线程应该被阻塞
    abstract boolean writerShouldBlock();

    //释放独占锁
    protected final boolean tryRelease(int releases) {
        //当前线程持有独占锁?
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
            
        //减少独占锁的计数,若计数为0表示无线程占有独占锁
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

    //获取独占锁
    protected final boolean tryAcquire(int acquires) {
         //1、若读锁计数或写锁计数非0,且当前线程没有占有独占锁,则获取锁失败;
         //2、若独占锁计数大于最大值(65535),则抛出异常;
         //3、若当前线程可以获取读锁,并CAS设置状态成功,则获取锁成功;
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    //释放共享锁
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //如果加锁时的第一个线程是当前线程,则对firstReader及firstReaderHoldCount进行处理;
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } 
        //加锁的第一个线程不是当前线程?取出缓存计数器cachedHoldCounter,若其线程不是当前线程;
        //则从readHolds中获取锁计数器
        else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            //锁计数器<=1表示当前线程最后一次释放锁,之后不再持有锁
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //CAS修改锁状态
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

    
    //获取共享锁
    protected final int tryAcquireShared(int unused) {
        //1、若独占锁计数不等于0,且当前线程不持有独占锁,则获取读锁失败;
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        //2、若同步队列为空或当前线程为头结点(!readerShouldBlock())
        //且共享锁计数小于最大值,且CAS更新锁状态成功;则更新相关读锁计数
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            //只有一个读锁?则只更新第一个读锁的线程和计数    
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            }
            //第一个读锁就是当前线程?
             else if (firstReader == current) {
                firstReaderHoldCount++;
            } 
            //更新锁计数缓存
            else {
                //读取最后一个获取锁的线程信息;
                //若最后获取锁的线程是当前线程,则将计数缓存放入ThreadLocal中;
                //否则从ThreadLocal中获取锁计数放到缓存中
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        //自旋重试
        return fullTryAcquireShared(current);
    }

    
    //CAS获取共享锁,逻辑与tryAcquireShared()大致相同
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

    //尝试获取写锁
    final boolean tryWriteLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        //当前有锁计数值
        if (c != 0) {
            //独占锁计数为0()说明有共享锁)或当前线程不是独占锁线程
            int w = exclusiveCount(c);
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
        }
        //CAS更改锁状态(有可能其他线程并发CAS更改锁状态)
        if (!compareAndSetState(c, c + 1))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    //尝试获取读锁
    final boolean tryReadLock() {
        Thread current = Thread.currentThread();
        for (;;) {
            int c = getState();
            //独占锁计数不为0,且独占锁线程不是当前线程,获取读锁失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return false;
            int r = sharedCount(c);
            if (r == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //CAS设置锁状态
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return true;
            }
        }
    }
}

2.3、ReadLock 及WriteLock实现

ReadLock 及WriteLock中获取读锁及获取写锁都是通过直接调用Sync相关方法实现的。

ReadLock实现:

public static class ReadLock implements Lock {
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquireShared(1);
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryReadLock();
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.releaseShared(1);
    }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

WriteLock实现:

public static class WriteLock implements Lock{
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    public void lock() {
        sync.acquire(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}

版权声明:本文为yuanwei1144原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。