ReentrantLock 和 ReentrantReadWriteLock 详解

ReentrantLock 和 ReentrantReadWriteLock 详解



概述

Lock 在 J.U.C 中是最核心的组件,,锁最重要的特性就是解决并发安全问题。 为什么要以 Lock 作为切入点呢?如果看过 J.U.C 包中的所有组件,一定会发现绝大部分的组件都有用到了 Lock。 所以通过 Lock 作为切入点使得在后续的学习过程中会更加轻松。

在 Lock 接口出现之前, Java 中的应用程序对于多线程的并发安全处理只能基于synchronized 关键字来解决。但是 synchronized 在有些场景中会存在一些短板,也就是它并不适合于所有的并发场景。 但是在 Java5 以后, Lock 的出现可以解决synchronized 在某些场景中的短板,它比 synchronized 更加灵活。

Lock 本质上是一个接口,它定义了释放锁和获得锁的抽象方法, 定义成接口就意味着它定义了锁的一个标准规范,也同时意味着锁的不同实现。 实现 Lock 接口的类有很多,以下为几个常见的锁实现:

ReentrantLock:重入锁,他是唯一一个实现了Lock接口的实现类,重入锁指的是同一个线程再次获取同一把锁时,不需要阻塞,而是把重入次数加一即可。

ReentrantReadWriteLock:重入读写锁,它实现了 ReadWriteLock 接口,在这个类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。

StampedLock:stampedLock 是 JDK8 引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程。

1. ReentrantLock

Lock 有很多的锁的实现,但是直观的实现是 ReentrantLock 重入锁。

可重入锁的设计是为了避免死锁的出现,synchronized关键字加的同步锁也是可重入锁。

ReentrantLock是Java并发包中提供的一个可重入的互斥锁。ReentrantLock和synchronized在基本用法,行为语义上都是类似的,同样都具有可重入性。只不过相比原生的Synchronized,ReentrantLock增加了一些高级的扩展功能,比如它可以实现公平锁,同时也可以绑定多个Conditon。

1.1 可重入性

所谓的可重入性,就是可以支持一个线程对锁的重复获取,原生的synchronized就具有可重入性,一个用synchronized修饰的递归方法,当线程在执行期间,它是可以反复获取到锁的,而不会出现自己把自己锁死的情况。ReentrantLock也是如此,在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

假设锁不可重入:有以下代码:

public class Test{

    private int num;

    public synchronized void write(){
        num = 10;
    }

    public synchronized void read(){
        write();
        System.out.println(num);
    }

    public static void main(String[] args) {
        Test test= new Test();
        test.read();
    }
}

上面代码,read和write方法加的是同一把同步锁,就是当前的对象。如果锁不可重入的话,调用在main方法里面调用read方法,首先由read方法获取到了锁,然后在read方法调用write的方法,因为write方法也需要获取这把锁,但是该锁已经被read方法获取了,write方法要等到read方法释放锁才能获取锁执行代码块,而read方法要等到write方法执行完成才能释放该锁,这就导致了死锁。

假设锁是可重入的,调用在main方法里面调用read方法,首先由read方法获取到了锁,然后read方法里面调用write方法,由于执行read和write方法是同一线程,并且需要的锁是同一把锁,所以,write方法无需获取锁,只需把重入次数加1,然后write方法执行完,把重入次数减一即可。


1.2 ReentrantLock实现原理

锁的基本原理是,基于将多线程并行任务通过某一种机制实现线程的串行执行,从而达到线程安全性的目的。 在 synchronized 中,使用了偏向锁、轻量级锁、乐观锁。基于乐观锁以及自旋锁来优化了synchronized 的加锁开销,同时在重量级锁阶段,通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。那么在 ReentrantLock 中,也一定会存在这样的需要去解决的问题。 就是在多线程竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?

AQS:在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。 如果搞懂了 AQS,那么 J.U.C 中绝大部分的工具都能轻松掌握。

从使用层面来说,AQS的功能分两种:独占和共享:
独占:每次只能有一个线程持有锁,比如ReentrantLock 就是以独占方式实现的互斥锁。
共享:允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock。


1.3 AQS内部实现

AQS 队列内部维护的是一个 FIFO 的双向链表,链表的每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去; 当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

通过ReentrantLock源码分析作为切入点,来看看AQS是如何实现线程同步的:以同步器的非公平锁为例子:

调用时序图入下:

在这里插入图片描述

  1. 从ReentrantLock的lock接口为入口:建议画图直观理解,画同步队列的变化情况。
	public void lock() {
		//该方法调用了同步器Sync的lock方法,而Sync继承了AQS(AbstractQueuedSynchronizer)
	    sync.lock();
	}

在这里插入图片描述

  1. 我们跟踪非公平同步器NonfairSync的lock方法。
 
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

	protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
 //这个是AbstractQueuedSynchronizer的锁状态。
 private volatile int state;

假设有3个线程竞争锁(假设当前锁处于空闲状态state=0),进来了上面的一个方法,进行第一个判断compareAndSetState,这个方法是使用cas乐观锁的方式来更新锁的状态(不懂cas要去补下功课),当锁的状态为0表示当前锁处于空闲状态,大于0表示已经有其他线程占用了该锁。

第一个线程进来了:
调用了compareAndSetState方法,因为当前锁属于空闲状态,所以修改值会成功,返回true,第一个线程就进入了这个if分支,调用setExclusiveOwnerThread方法。

//这个方法很简单,就是把当前同步器的exclusiveOwnerThread 成员变量赋值为调用它的线程,
//exclusiveOwnerThread 表示的是当前是哪个线程独占这把锁。调用该方法表示当前线程获取到了锁。
protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
}

第二个线程进来了:
调用了compareAndSetState方法,因为当前锁属于上锁状态(state!=1),所以修改值会失败,返回false,所以第二个线程就进入了这个else分支分支,调用acquire方法。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

这个方法有四步:
调用tryAcquire、addWaiter、acquireQueued、selfInterrupt四个方法。

tryAcquire:

//这个是NonfairSync的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
     return nonfairTryAcquire(acquires);
}


final boolean nonfairTryAcquire(int acquires) {
			//此时acquires=1
			//获取当前线程
            final Thread current = Thread.currentThread();
			//获取当前同步锁的锁状态
            int c = getState();
            if (c == 0) {
            	//c == 0表示此时锁处于空闲状态,原因是当线程2走到这里之前,线程1已经释放了锁。
            	//这里再次尝试获得锁,因为可能存在线程持有锁时间很短的情况,假如此时线程1已经释放
            	//了锁,此时再次尝试获得锁就可能会成功,这就避免了把线程阻塞进入同步队列这种情况
            	//是对性能提高做的一个小技巧。
                if (compareAndSetState(0, acquires)) {
                	//跟前面那个 获取锁成功
                    setExclusiveOwnerThread(current);
                    //返回true表示获取锁成功
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
            	//getExclusiveOwnerThread方法是获取当前占用锁的线程,如果跟当前线程是同一线程,就无需再次获取锁,而是把重入次数加一。
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //设置状态,state记录的就是重入次数,次数为0代表锁空闲。
                setState(nextc);
                //返回true表示获取锁成功
                return true;
            }
            //返回false表示获取锁失败
            return false;
		//该方法逻辑总结就是尝试再次使用cas获取锁或者添加重入次数而获取锁成功,如果这两个都不成功,就获取失败。
}

如果tryAcquire返回false,表示获取锁失败,就会执行addWaiter方法。

//这个方法就是把线程封装成Node添加进双端队列中。
private Node addWaiter(Node mode) {
		//mode 传递的参数是Node.EXCLUSIVE,表示的该锁是排它锁的标志。
		//创建一个Node,使用当前线程和mode
        Node node = new Node(Thread.currentThread(), mode);
        //获得当前同步器所使用的双端链表的尾部节点。
        Node pred = tail;
        if (pred != null) {
        	//如果尾部节点不为空,就进入这里,当时由于是第二个线程,此时双端链表还没有元素,所以尾部节点是空的,此时就不走这里。
        	//把当前节点的前驱节点设置为双端链表的尾部节点
            node.prev = pred;
            //使用cas把尾部节点指向新增的节点。上面的不用cas,这里要使用的原因是node节点属于当前线程,所以修改是不存在线程安全的,而pred 尾部节点是共享的,所以要cas保证原子性
            //上面这两步的作用就是往双端链表的尾部节点
            if (compareAndSetTail(pred, node)) {
            	//如果修改成功,就意味着新增的节点已经添加到了双端队列
            	//所以此时就把原本的尾部节点的后驱引用指向新增的节点,此时就彻底完成了双端节点的新节点添加
                pred.next = node;
                //返回新增节点
                return node;
            }
        }
        //两种情况会走这里
        //1. 如果尾部节点为空 
        //2. 尾部节点不为空但是添加到双端队列失败(compareAndSetTail返回false)的节点就会走这里。详情看下面
        enq(node);
        return node;
    }

private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }


private Node enq(final Node node) {
		//这个方法就是为上面其他获取不到锁的线程并且没有成功添加到双端队列的线程进行兜底添加
		//使用了自旋的方式。
		//如果是上面第一种情况,会至少进行两次自旋
        for (;;) {
        	//获取双端队列的尾部节点
            Node t = tail;
            if (t == null) { // Must initialize
            	//如果尾部节点为空,那就是上面的第一种情况,就会进入这里。
            	//compareAndSetHead是往头结点设置一个空节点,也就是没有实际封装线程的空节点。
                //进入这里后,需要进行字段第二次,第二次就会走else分支,因为在这里已经设置好了尾节点。
                if (compareAndSetHead(new Node()))
                	//成功后把尾结点执行头结点
                    tail = head;
            } else {
            	//尾结点不为空进入这里。进入这里的原因是经过了上面if分支的第一次自旋设置了尾结点,或者从上面方法的第二种情况进来。
				//把要新增的节点的前驱设置为尾节点。
                node.prev = t;
                //cas设置尾节点指向新节点
                //这里通过不断自旋,知道把获取不到锁的线程封装成节点添加到双端队列中
                if (compareAndSetTail(t, node)) {
                	//原来的尾节点的后驱引用设置为新节点。
                    t.next = node;
                    return t;
                }
            }
        }
    }

执行acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
		
		//此时arg是1,node是添加到双端队列的新节点
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	//获取不到锁的线程就是被阻塞在这个自旋里面
            	//获取当前节点的前驱节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                	//前驱节点为头节点并且成功获取了锁,就进入这里
                	//进入这里代表该线程获得了锁,才能跳出自旋。
                	//设置当前节点为头节点,并把当前节点的线程、等置空。
                    setHead(node);
                    //从队列中删除前驱节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //没有获取锁,就阻塞。
                    interrupted = true;
            }
        } finally {
            if (failed)
            	//这个兜底方法是对没有获取锁,但是通过一些手段,比如出错异常等而跳出自旋。但是实际上没有获取锁,就会调用这个方法。
                cancelAcquire(node);
        }
    }


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
		//pred前驱节点
		//node当前节点
		//这个方法返回的在线程获取锁失败后,是否应该进行阻塞
		//获取前驱节点的节点状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
        	//如果当前节点的前驱节点的节点状态为SIGNAL状态,当前节点就能进行阻塞。返回true。
        	//只有这个分支返回true。
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * 前驱节点的状态大于0,也就是属于cancel状态,这些节点应该直接从队列里面去除。
             * 一直循环,知道删除到的节点不属于cancle为止。
             */
            do {
            	//这里相当于:
            	//node.prev=pred.prev  //当前节点的前驱引用执行前驱节点的前驱引用
            	//pred=pred.prev		//前驱节点=原前驱节点的前驱节点。
            	//熟悉双端链表,再画个图,这里就比较好理解了。
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 将前驱节点的节点状态设置为SIGNAL
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;

		//总体逻辑是
		//1. 如果前驱节点的状态为SIGNAL,就返回true,表示当前节点可以阻塞。
		//2. 如果前驱节点的状态为cancle,就从队列去除。返回false。
		//3. 其他情况就把前驱节点状态修改为SIGNAL
		//意味着执行后,队列节点除了最后一个节点,其他都为SIGNAL
    }

 private final boolean parkAndCheckInterrupt() {
 		//如果shouldParkAfterFailedAcquire方法返回true后,才执行这个方法,进行线程阻塞。
        LockSupport.park(this);
        return Thread.interrupted();
    }


private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

		//把线程置空
        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
        	//将当前节点的前置节点waitStatus 为cancel的节点去掉,知道遇到第一个不为cancel的节点。
            node.prev = pred = pred.prev;

        
        Node predNext = pred.next;

        //将当前节点的状态设置为CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 
        if (node == tail && compareAndSetTail(node, pred)) {
        	//如果当前节点是尾部节点,就删除我们
            compareAndSetNext(pred, predNext, null);
        } else {
            
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

解锁unlock:

public void unlock() {
        sync.release(1);
    }

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	//取消阻塞/唤醒线程
                unparkSuccessor(h);
            return true;
        }
		//返回释放锁失败,这里的失败就是没有完全释放锁(重入次数没有减为零,也有可能减了一次)
        return false;
    }

//首先调用tryRelease方法尝试释放锁
protected final boolean tryRelease(int releases) {
			//此时releases=1
			//减少重入次数
            int c = getState() - releases;
            //如果解锁的线程不等于当前占用锁的线程,就抛出异常IllegalMonitorStateException
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
            	//如果c=0,那就带表重入次数等于0,表示如果减少了重入次数后,锁就是空闲状态。
            	//就表示这次释放锁是完全释放锁。
                free = true;
                //把锁的当前占用线程设置为null
                setExclusiveOwnerThread(null);
            }
            //设置锁的重入次数(状态)
            setState(c);
            //返回是否是否成功,这里返回true的情况是经过这次释放后,锁的重入次数为0这个情况才返回true。
            return free;
        }


private void unparkSuccessor(Node node) {
        /*
         *释放锁时传入的node是头节点
         */
        int ws = node.waitStatus;
        if (ws < 0)
        	//如果头节点的状态为小于0,设置为0
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 
         */
         //获取头节点的下一个节点,因为头节点是不存在线程的,也就是说头节点只是一个用于入口的空节点。
         //如果你从上面的上锁逻辑 用图一步一步跟着话出双端链表元素,会发现头节点是一个空节点
         //所以真正是从第二个节点开始的
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
        	//当第二个节点为空或者状态为Cancel时进入
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
            	//从尾向前遍历,找到最后一个状态小于等于0的节点,把s赋值。
            	//为什么不从头遍历找第一个状态小于等于0的节点呢,因为这样的效率更高,
            	//原因是在上锁的时候,是先链接前驱节点再链接后驱节点的,如果从头遍历,使用后驱节点,可能会出现断链的情况
                if (t.waitStatus <= 0)
                    s = t;
        }
       
        if (s != null)
        	//唤醒该线程
            LockSupport.unpark(s.thread);
	//总体逻辑是找到双端链表从第二个节点开始第一个状态小于等于0的节点的线程进行唤醒。
    }

上锁的总体逻辑是:

  • 获取锁成功的线程直接返回执行同步代码块。
  • 获取不到锁的线程从尾部加入到同步双端队列中,并在一个自旋内阻塞,等待唤醒后再自旋内获取锁成功就跳出自旋。
  • 同一个线程获取同一把锁把AQS的state(重入次数+1)。
  • 获取到锁的线程将会从双端队列中删除。

释放锁的逻辑:

  • 只有把锁的重入次数减到0才进行对同步队列中的线程进行唤醒。如果没有减到0的释放锁就仅仅把AQS的state减一。
  • 如果重入次数减到0,就把双端队列从第二个节点开始,往后找,找到第一个节点状态小于等于0的节点来唤醒去获取锁。

以上也就是AQS的大致原理,通过一个同步双端队列来控制锁的同步情况。


1.4 公平锁/非公平锁

所谓公平锁,顾名思义,意指锁的获取策略相对公平,当多个线程在获取同一个锁时,必须按照锁的申请时间来依次获得锁,排排队,不能插队;非公平锁则不同,当锁被释放时,等待中的线程均有机会获得锁。synchronized是非公平锁,ReentrantLock默认也是非公平的,但是可以通过带boolean参数的构造方法指定使用公平锁,但非公平锁的性能一般要优于公平锁

synchronized是Java原生的互斥同步锁,使用方便,对于synchronized修饰的方法或同步块,无需再显式释放锁。而ReentrantLock做为API层面的互斥锁,需要显式地去加锁解锁。采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。

class X {
    private final ReentrantLock lock = new ReentrantLock();
    public void m() {
      lock.lock();  // 加锁
      try {
        // ... 函数主题
      } finally {
        lock.unlock() //解锁
      }
    }
}

1.5 ReentrantLock 实现非公平锁

虽然AQS实现了很多功能,但是具体的获取锁、释放锁是由子类来实现的,也就是说只有子类能够决定:**"什么才是获取锁,怎么获取锁?什么才是释放锁,怎么释放锁?**继承自AQS的子类需要实现如下方法:

在这里插入图片描述

1.5.1 非公平锁的获取

先思考一下什么是非公平?在AQS分析里提到过:获取锁失败的线程会被加入到同步队列的队尾,如果线程A刚好释放了锁,而此时线程B也要争取锁,若是竞争成功了就直接获取锁了,而在同步队列的线程虽然比线程B更早地排队了,但反而被线程B窃取了革命的果实,这对它们来说是不公平的。

来看看ReentrantLock 是如何实现非公平锁的,先看看ReentrantLock的定义:

//Lock 接口里声明了获取锁、释放锁等接口。
public class ReentrantLock implements Lock, java.io.Serializable {}

在这里插入图片描述
Sync/NonfairSync/FairSync是ReentrantLock里的静态内部类,Sync继承自AbstractQueuedSynchronizer,而NonfairSync、FairSync,继承自Sync。
ReentrantLock 非公平锁的构造:

    public ReentrantLock() {
        sync = new NonfairSync();
    }

可以看出,ReentrantLock 默认实现非公平锁。
获取非公平锁:

		#ReentrantLock.java
        final void lock() {
        	//设置state=1
            if (compareAndSetState(0, 1))
            	//设置成功,记录获取锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
            	//尝试修改失败后走到此,这是AQS里的方法
                acquire(1);
        }

此处再说明一下compareAndSetState(0,1),典型的CAS操作,尝试将state由0修改为1,若是发现state不是0,说明已经有线程修改了state,这个修改者可能是别的线程,也可能是自己,此时CAS失败。
继续来看acquire(xx):

	#AbstractQueuedSynchronizer.java
    public final void acquire(int arg) {
        //由子类实现
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

其它方法在AQS里已经分析过,此处重点是分析tryAcquire(xx)。

		#ReentrantLock.java
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //获取同步状态
            int c = getState();
            if (c == 0) {
            	//说明此时没人占有锁
            	//尝试占用锁
                if (compareAndSetState(0, acquires)) {
                	//成功,则设置占有锁的线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //c!=0,说明有线程占有锁了
            else if (current == getExclusiveOwnerThread()) {
            	//若是占有锁的线程是当前线程,则判断是线程重入了锁
                //直接增加同步状态
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //设置状态
                setState(nextc);
                return true;
            }

            //都不满足,则认为抢占锁失败
            return false;
        }

可以看出,非公平锁的获取锁的关键逻辑就在nonfairTryAcquire(xx)里:

1、先判断当前是否有线程占有锁,若没有,则尝试获取锁。
2、若已有线程占有锁,判断占有的线程是不是自己,若是则增加同步状态,表示是重入。
3、若1、2步骤都没有获取锁,则表示获取锁失败。

用图表示流程如下:
在这里插入图片描述

由图可知,非公平锁获取锁时:

一上来就开始抢占锁,失败后才开始判断是否有线程占有锁,没有人占有的话又开始抢占,这些抢占操作不成功才会进入同步队列阻塞等待别的线程释放锁。
这也是非公平的特点:不管是否有线程在排队等候锁,我就不排队,直接插队,实在不行才乖乖排队。

可以看出,独占锁的核心是:

谁能够成功将state从0修改为1,谁就能够获取锁。
换句话说,state>0,表示该锁已被占用。

1.5.2 非公平锁的释放

既然有lock过程,那么当然有unlock过程:

	#ReentrantLock.java
    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
    	//释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

依然的,只分析tryRelease(xx):

	#ReentrantLock.java
    protected final boolean tryRelease(int releases) {
    		//已有的同步状态 - 待释放的同步状态
            int c = getState() - releases;
            //只有获取锁的线程才能释放锁
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
            	//释放后,同步状态为0,说明已经没有线程占有锁了
                free = true;
                //没有占有锁了,标记置null
                setExclusiveOwnerThread(null);
            }
            //对于独占锁来说,c!=0,说明是线程重入,还没释放完
            //设置释放后的同步状态值
            setState(c);
            return free;
    }

可以看出,非公平锁释放核心逻辑在tryRelease(xx)里:

将state值-1,若是最后state==0,说明已经完全释放锁了。
只有持有锁的线程才能修改state,因此修改state无需CAS。


1.6 ReentrantLock 实现公平锁

1.6.1 公平锁的获取

既然非公平锁的特点是插队,那么公平锁就需要老老实实排队,重点是如何判断队列里是否有线程等待。

	#ReentrantLock.java
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
        	//没有尝试获取锁
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            	//当前没有线程占有锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
            	//重入
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

当调用lock()时,并没有直接抢占锁,当判断锁没有被任何线程占有时,也没有立刻去抢占锁,而是先判断当前同步队列里是否有线程在排队等候锁:

	#AbstractQueuedSynchronizer.java
    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        //三个判断条件
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

该方法返回true,表示有(正在插入)节点在同步队列里等待。
理解上面的判断需要对AQS有一定的了解,再次来看看同步队列:
在这里插入图片描述
同步队列是由双向链表实现的,头节点不指向任何线程。
第一个条件:h != t
照理来说,h!=t 就能说明同步队列里至少有两个节点,为什么还需要后面的判断呢?
想象一种场景:线程A获取了锁,此时线程B想要获取锁但失败了,于是B就加入到了同步队列,此时同步队列里有两个节点:头节点和B线程节点(head即是头节点,尾节点指向B节点)。某个时刻,A释放了锁,并唤醒了B,B醒来后再去调用tryAcquire(xx)去获取锁(这整个逻辑是AQS里实现,和ReentrantLock没关系)。
而当B调用tryAcquire(xx)时会通过hasQueuedPredecessors(xx)判断是否有节点在同步队列里等待,此时h!=t,但是因为等待的节点是B自己,实际上B是不再需要再插入等待队列的。
因此仅仅是h!=t的判断是不够的,需要再判断等待的节点是否是当前节点本身。

第二个条件:s.thread != Thread.currentThread()
判断同步队列里的第一个等待(非头节点)的节点是否是当前节点本身,s 有可能为空,因此需要判空,于是有如下判断:

(s = h.next) == null && s.thread != Thread.currentThread()

你可能已经发现了,源码里的判断是"||“而非”&&",也就是说若h.next==null,也可作为同步队列有节点等待的依据,这是基于什么场景考虑的呢?

第三个条件:(s = h.next) == null
理解这个问题需要考虑并发场景,先看看同步队列是如何初始化的:

	#AbstractQueuedSynchronizer.java
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
            	//当前队列为空,将头节点指向新的节点
                if (compareAndSetHead(new Node()))
                	//再将尾节点指向头节点
                    tail = head;
            } else {
            	...
            }
        }
    }

初始的时候头节点(head),尾节点(tail)都为null,此时头节点指向新的节点,但是还没来得及执行tail=head。这个时候hasQueuedPredecessors(xx)被另一个线程执行了,然后判断h!=t(hNode,tnull),结果为true。
若是此时h.next==null,说明同步队列正在初始化,进一步说明有节点正在准备入队,此时整体判断就是:同步队列里有节点在等待。
于是,通过上述三个条件就可以判断同步队列里是否有节点在等待。
可以看出,公平锁的公平之处在于:

先判断有没有节点(线程)先于当前线程排队等候锁的,若有则当前线程需要排队等候。

公平锁获取锁流程如下:

在这里插入图片描述

1.6.2 公平锁的释放

与非公平锁的释放逻辑一致。

小结公平锁与非公平锁:

公平与非公平体现在获取锁时策略的不同:
1、公平锁每次都会检查队列是否有节点等待,若没有则抢占锁,否则就去排队等候。
2、非公平锁每次都会先去抢占锁,实在不行才排队。
3、公平锁、非公平锁在释放锁的逻辑上是一致的。


1.7 ReentrantLock 实现可中断锁

AQS 能够实现可中断锁与不可中断锁,ReentrantLock 只是借助了AQS完成了此功能:

	#ReentrantLock.java
    public void lockInterruptibly() throws InterruptedException {
        //核心在AQS里实现
        sync.acquireInterruptibly(1);
    }

可中断用白话一点地说:

若是线程在同步队列里等待,外界调用了Thread.interrupt,结果就是被中断的线程被唤醒,放弃获取锁,并抛出中断异常。


1.8 ReentrantLock tryLock 原理

有些时候,我们并不想一上来就去获取锁,万一锁被别的线程占有了,那么当前线程就会阻塞住。也就是说仅仅想要尝试一次获取锁,若不成功则直接退出,不去排队,这个方法能满足需求:

	#ReentrantLock.java
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

当然,排队也接受,只是需要限时,也就是说我就等待这么长时间,时间到了还是没获取锁,那么我就不再排队等候了,退出争抢锁的流程。

	#ReentrantLock.java
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

1.9 ReentrantLock 等待/通知

等待/通知机制基于等待队列,这部分也是AQS实现的,ReentrantLock 封装了相应的接口。

	#ReentrantLock.java
    public Condition newCondition() {
        return sync.newCondition();
    }

	#AbstractQueuedSynchronizer.java
    final ConditionObject newCondition() {
            return new ConditionObject();
    }

实际上就是生成了ConditionObject对象,并操作这个对象。
ConditionObject 是AQS里的非静态内部类。
注:等待通知机制需要配合独占锁使用

public class TestThread {
    static ReentrantLock reentrantLock = new ReentrantLock();
    static Condition condition = reentrantLock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //子线程等待
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        Thread.sleep(2000);
        //主线程通知
        condition.notify();
    }
}

以上是利用了等待/通知 实现了简单的线程间同步。


2. ReentrantLock 与synchronized 异同点

2.1 相同点

基本数据结构

都包含:volatile + CAS + 同步队列 + 等待队列(等待/通知)。
这些数据结构是AQS实现的,并非ReentrantLock.java 里实现的,只是为了方便比对,才这么写。

实现功能

1、都能实现独占锁功能。
2、都能实现非公平锁功能。
3、都能实现可重入锁功能。
4、都是悲观锁。
5、都能实现不可中断锁。

2.2 不同点

实现方式

1、synchronized是关键字,ReentrantLock是类。
2、synchronized由JVM实现(主要是C++),ReentrantLock 由Java代码实现。
3、synchronized能修饰方法和代码块,ReentrantLock 只能修饰代码块。
4、synchronized保护的方法/代码块发生异常能够自动释放锁,ReentrantLock保护的代码块发生异常不会主动释放,因此需要在finally里主动释放锁。

提供给外界功能

1、ReentrantLock 能够实现公平锁,而synchronized 不能。
2、ReentrantLock 能够实现共享锁,而synchronized 不能。
3、ReentrantLock 能够实现可中断锁,而synchronized 不能。
4、ReentrantLock 能够实现限时等待锁,而synchronized 不能。
5、ReentrantLock 能够检测当前锁是否被占用,而synchronized 不能。
6、ReentrantLock 能够绑定多个条件,而synchronized 只能绑定一个条件。
7、ReentrantLock 能够获取同步队列、等待队列长度,而synchronized 不能。

性能区别

synchronized 在jdk1.6 以后增加了偏向锁、轻量级锁、锁消除、锁粗化等技术,大大优化了synchronized 性能。
现在没有明确的数据/理论表明 ReentrantLock 比synchronized 更快,官方也仅仅是推荐使用synchronized。

很多文章说:“synchronized 使用了mutex,陷入内核态,而ReentrantLock 使用CAS,是CPU的特殊指令云云”,由此证明synchronized 更耗性能。
这种说法是有问题的,还记得我们说过jdk1.6之前synchronized 为啥是重量级锁的原因:

线程的挂起需要保存上下文,唤醒需要恢复回来,这过程耗费资源。

现在来对比一下synchronized、 ReentrantLock在高并发的场景下如何处理线程的挂起与唤醒的。
先说synchronized,当线程发现锁被占用时,处理逻辑如下:

1、将线程加入等待队列,并挂起线程。
2、挂起方式:ParkEvent.park(xx)—>NPTL.pthread_cond_wait(xx)/NPTL.pthread_cond_timedwait
NPTL是Linux glibc下实现的,用的是futex。

再说ReentrantLock,当线程发现锁被占用时,处理逻辑如下:

1、将线程加入等待队列,并挂起线程。
2、挂起方式:AQS—>LockSupport.park()—>Unsafe.park(xx)—>Parker.park(xx)—>NPTL.pthread_cond_wait(xx)/NPTL.pthread_cond_timedwait

由此可以看出,synchronized 与ReentrantLock 底层挂起线程实现方式是一致的。

接着来看所谓的:“ReentrantLock 使用CAS,而synchronized 使用底层xx东西”。
先说ReentrantLock,当抢占锁时使用CAS,CAS是一次性操作,也就是它只有两种结果:

要么成功,要么失败。
在ReentrantLock 或者AQS 里并没有一直循环使用CAS 抢占锁的实现方式,也就是说线程没有获取到锁,最终的结果还是被挂起,也即是调用上面分析的挂起方法。
CAS 调用栈:AQS.compareAndSetState(xx)—>Unsafe.compareAndSwapInt(xx)—>Atomic::cmpxchg(xx)
其中Atomic是原子操作类,也就是说cmpxchg(xx) 是原子函数,不可打断的。

而synchronized,当抢占锁时使用CAS,同样的CAS调用栈如下:

Atomic::cmpxchg_ptr(xx)
因为synchronized 本身就是C++实现的语义,因此直接调用了Atomic。

通过比对源码分析ReentrantLock 和 synchronized的CAS、线程挂起方式,发现两者底层实现是一致的。那么上面的言论就可以被证伪了。

非公平锁多了一个通过CAS获取锁的逻辑,那就意味着新来的线程可能比在同步队列中的线程更快的获得锁(插队)。
公平锁就没有这一步,意味着新来的线程必须先进入阻塞队列,排队被唤醒。

还有在tryAcquire时,公平锁多了个hasQueuedPredecessors()的判断,这个判断是判断同步队列中是否有线程在等待,如果有,当前线程就不能获取锁,也一步也是为了防止插队。

公平锁就是先进来的线程先获取锁,不能插队。

两者使用场景

ReentrantLock 在JUC 下各种并发数据结构被广泛应用者,比如LinkedBlockingQueue、DelayQueue等。
当然synchronized也不甘示弱,比如StringBuffer、MessageQueue、jdk 1.8 之后的hashMap实现等都使用了synchronized。

可以看出,ReentrantLock 提供了更灵活、更细的控制锁的方式,而synchronized 操作更简单。
如果你想要某项功能,请查看上面的异同点,找出符合自己需求的锁


3. ReentrantReadWriteLock

ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。

读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。

ReentrantReadWriteLock支持以下功能:

1)支持公平和非公平的获取锁的方式;
2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
4)读取锁和写入锁都支持锁获取期间的中断;
5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。


3.1 共享锁、独享锁区别

基本差别

共享锁、独占锁是在AQS里实现的,核心是"state"的值:

在这里插入图片描述
如上图,对于共享锁来说,允许多个线程对state进行有效修改。

读写锁的引入

根据上面的图,state 同时只能表示一种锁,要么独占锁,要么共享锁。而在实际的应用场景里经常会碰到多个线程读,多个线程写的情况,此时为了能够协同读、写线程,需要将state改造。
先来看AQS state 定义:

#AbstractQueuedSynchronizer.java
private volatile int state;

可以看出是int 类型的(当然也有long 类型的,在AbstractQueuedLongSynchronizer.java 里,本文以int 为例)

在这里插入图片描述
state 被分为两部分,低16位表示写锁(独占锁),高16位表示读锁(共享锁),这样一个32位的state 就可以同时表示共享锁和独占锁了。


3.2 读锁的实现原理

3.2.1 ReentrantReadWriteLock 的构造

ReentrantReadWriteLock 并没有像ReentrantLock一样直接实现Lock 接口,而是内部分别持有ReadLock、WriteLock类型的成员变量,两者均实现了Lock 接口。

	#ReentrantReadWriteLock.java
    public ReentrantReadWriteLock() {
    	//默认非公平锁
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        //构造读锁
        readerLock = new ReadLock(this);
        //构造写锁
        writerLock = new WriteLock(this);
    }

ReentrantReadWriteLock 默认实现非公平锁,读锁、写锁支持非公平锁和公平锁。
读写锁构造之后,将锁暴露出来给外部使用:

	#ReentrantReadWriteLock.java
    //获取写锁对象
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    //获取读锁对象
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

3.2.2 获取锁

在ReentrantLock 分析独占锁时有如下图:
在这里插入图片描述
与独占锁类似,AQS虽然已经实现了共享锁的基本逻辑,但是真正获取锁、释放锁的操作还是需要子类实现,共享锁需要实现方法:

tryAcquireShared & tryReleaseShared

来看看获取锁的过程:

	#ReentrantReadWriteLock.ReadLock
    public void lock() {
    		//共享锁
            sync.acquireShared(1);
        }

	#AbstractQueuedSynchronizer.java
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
        	//doAcquireShared 在AQS里实现
            doAcquireShared(arg);
    }    

重点是tryAcquireShared(xx):

		#ReentrantReadWriteLock.java
        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            //获取同步状态
            int c = getState();
            //此处exclusiveCount作用是取state 低16位,若是不等于0,说明有线程占有了写锁
            //若是有线程占有了写锁,而这个线程不是当前线程,则直接退出------------>(1)
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //获取state 高16位,若是大于0,说明有线程占有了读锁
            int r = sharedCount(c);
            //当前线程是否应该阻塞
            if (!readerShouldBlock() &&//------------>(2)
                r < MAX_COUNT &&//若是不该阻塞,则尝试CAS修改state高16位的值
                compareAndSetState(c, c + SHARED_UNIT)) {
            	//--------记录线程/重入次数----------->(3)
            	//修改state 成功,说明成功占有了读锁
                if (r == 0) {
                	//记录第一个占有读锁的线程
                    firstReader = current;
                    //占有次数为1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                	//第一个占有读锁的线程重入了该锁
                    firstReaderHoldCount++;
                } else {
                	//是其它线程占有锁
                	//取出缓存的HoldCounter
                    HoldCounter rh = cachedHoldCounter;
                    //若是缓存为空,或是缓存存储的不是当前的线程
                    if (rh == null || rh.tid != getThreadId(current))
                    	//从threadLocal里获取
                    	//readHolds 为ThreadLocalHoldCounter 类型,继承自ThreadLocal
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                    	//说明cachedHoldCounter 已经被移出threadLocal,
                        //重新加入即可------------>(4)
                        readHolds.set(rh);
                    //记录重入次数
                    rh.count++;
                    //--------记录线程/重入次数-----------
                }
                return 1;
            }
            //------------>(5)
            return fullTryAcquireShared(current);
        }

以上是获取读锁的核心代码,标注了5个重点,分别来分析。

  1. 此处表明了一个信息:

若是当前线程已经获取了写锁,那么它可以继续尝试获得读锁。 当它把写锁释放后,只剩读锁了。这个过程可以理解为锁的降级。

  1. 线程能否有机会获取读锁,还需要经过两个判断:

1、判定readerShouldBlock()。
2、判定读锁个数用完了没,阈值是2^16-1。

而读锁公平与否就体现在readerShouldBlock()的实现上。

先来看非公平读锁:

		#ReentrantReadWriteLock.java
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }

		#AbstractQueuedSynchronizer.java
       final boolean apparentlyFirstQueuedIsExclusive() {
       	//判断等待队列里的第二个节点是否在等待写锁
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

若等待队列里的第二个节点是在等待写锁,那么此时不能去获取读锁。
这与ReentrantLock不一样,ReentrantLock 非公平锁的实现是不管等待队列里有没有节点,都会去尝试获取锁。

再来看公平读锁

		#ReentrantReadWriteLock.java
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }

判断队列里是否有更早于当前线程排队的节点,该方法在上篇分析ReentrantLock 时有深入分析,此处不再赘述。

  1. 这部分代码看起来多,实际上就是为了记录重入次数以及为了效率考虑引入了一些缓存。

考虑到有可能始终只有一个线程获取读锁,因此定义了两个变量还记录重入次数:

		#ReentrantReadWriteLock.java
    	//记录第一个获取读锁的线程
        private transient Thread firstReader = null;
        //第一个获取读锁的线程获取读锁的个数
        private transient int firstReaderHoldCount;

再考虑到有多个线程获取锁,它们也需要记录获取锁的个数,与线程绑定的数据我们想到了ThreadLocal,于是定义了:

private transient ThreadLocalHoldCounter readHolds;

来记录HoldCounter(存储获取锁的个数及绑定的线程id)。
最后为了不用每次都去ThreadLocal里查询数据,再定义了变量来缓存HoldCounter:

#ReentrantReadWriteLock.java
private transient HoldCounter cachedHoldCounter;
  1. cachedHoldCounter.count == 0,是在tryReleaseShared(xx)里操作的,并且判断当线程已经彻底释放了读锁后,将HoldCounter 从ThreadLocal里移除,因此此处需要加回来。

  2. 走到这一步,说明之前获取锁的操作失败了,原因有三点:

1、readerShouldBlock() == true。
2、r >= MAX_COUNT。
3、中途有其它线程修改了state。

fullTryAcquireShared(xx)与tryAcquireShared(xx)很类似,目的就是为了获取锁。
针对第三点,fullTryAcquireShared(xx)里有个死循环,不断获取state值,若是符合1、2点,则退出循环,否则尝试CAS修改state,若是失败,则继续循环获取state值。

小结一下:

1、fullTryAcquireShared(xx) 获取锁失败返回-1,接下来的处理逻辑流转到AQS里,线程可能会被挂起。
2、fullTryAcquireShared(xx) 获取锁成功则返回1。

3.2.3 释放锁

释放锁的逻辑比较简单:

	#ReentrantReadWriteLock.ReadLock
    public void lock() {
            sync.acquireShared(1);
        }
	#AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
        	//在AQS里实现
            doReleaseShared();
            return true;
        }
        return false;
    }

重点是tryReleaseShared(xx):

		#ReentrantReadWriteLock.java
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //当前线程是之前第一个获取读锁的线程
            if (firstReader == current) {
                if (firstReaderHoldCount == 1)
                	//彻底释放完了,置空
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
            	//先从缓存里取
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                	//取不到,则需要从ThreadLocal里取
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                	//若是当前线程不再占有锁,则清除对应的ThreadLocal变量
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
            	//修改state
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                	//若是state值变为0,说明读锁、写锁都释放完了
                    return nextc == 0;
            }
        }

此处需要注意的是:
tryReleaseShared(xx)释放读锁时候,若是没有完全释放读锁、写锁,那么将会返回false。
而在AQS里释放共享锁流程如下:

	#AbstractQueuedSynchronizer.java
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

也就是说此种情况下,doReleaseShared() 将不会被调用,也就不会唤醒同步队列里的节点。
这么做的原因是:

若只释放完读锁,还剩写锁被占用。而因为写锁是独占锁,其它线程无法获取锁,那么即使唤醒了它们也没有用。


3.3 写锁的实现原理

3.3.1 获取锁

写锁是独占锁,因此重点关注tryAcquire(xx):

		#ReentrantReadWriteLock.java
        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //获取同步状态
            int c = getState();
            //获取当前写锁个数
            int w = exclusiveCount(c);
            if (c != 0) {
            	//1、若是w==0,而c!= 0,说明有线程占有了读锁,不能再获取写锁了
            	//2、若是写锁被占用,但是不是当前线程,则不能再获取写锁了
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //锁个数超限了
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");

                //走到此处,说明重入,直接设置,同一时刻只有一个线程能走到这
                setState(c + acquires);
                return true;
            }
            //若c==0,此时读锁、写锁都没线程占用
           	//判断线程是否应该被阻塞,否则尝试获取写锁------->(1)
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //独占锁需要关联线程
            setExclusiveOwnerThread(current);
            return true;
        }

来看看writerShouldBlock(),写锁公平/非公平就在此处实现的。

先来看非公平写锁:

		#ReentrantReadWriteLock.java
        final boolean writerShouldBlock() {
        	//不阻塞
            return false; // writers can always barge
        }

非公平写锁不应该阻塞。

再来看公平写锁:

		#ReentrantReadWriteLock.java
        final boolean writerShouldBlock() {
        	//判断队列是否有有效节点等待
            return hasQueuedPredecessors();
        }

和公平读锁一样的判断条件。

小结

1、读锁/写锁 已被其它线程占用,那么新来的线程将无法获取写锁。
2、写锁可重入。

3.3.2 释放锁

释放锁重点关注tryRelease(xx):

		#ReentrantReadWriteLock.java
        protected final boolean tryRelease(int releases) {
        	//当前线程是否持有写锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //同一时刻,只有一个线程会执行到此
            int nextc = getState() - releases;
            //判断写锁是否释放完毕
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
            	//取消关联
                setExclusiveOwnerThread(null);
            //设置状态
            setState(nextc);
            return free;
        }

若tryRelease(xx)返回true,则AQS里会唤醒等待队列的线程。


3.4 读写锁 tryLock 原理

3.4.1 读锁tryLock

		#ReentrantReadWriteLock.java
        public boolean tryLock() {
            return sync.tryReadLock();
        }

        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
            //for 循环为了检测最新的state
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                //记录次数
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    //获得锁后退出循环
                    return true;
                }
            }
        }

可以看出tryReadLock(xx)里: 只要不是别的线程占有写锁并且读锁个数没超出限制,那么它将一直尝试获取读锁,直到得到为止。

3.4.2 写锁tryLock

        public boolean tryLock() {
            return sync.tryWriteLock();
        }
        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

写锁只尝试一次CAS,失败就返回。
最终,用图表示读锁、写锁实现的功能:
在这里插入图片描述
读锁与写锁关系:

在这里插入图片描述


3.5 读写锁的应用

分析完原理,来看看简单应用。

public class TestThread {

    static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    static ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    public static void main(String args[]) {
        //读
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    String threadName = Thread.currentThread().getName();
                    try {
                        System.out.println("thread " + threadName + " acquire read lock");
                        readLock.lock();
                        System.out.println("thread " + threadName + " read locking");
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        readLock.unlock();
                        System.out.println("thread " + threadName + " release read lock remain read count:" + readWriteLock.getReadLockCount());
                    }
                }
            }, "" + i).start();
        }

        //写
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    String threadName = Thread.currentThread().getName();
                    try {
                        System.out.println("thread " + threadName + " acquire write lock");
                        writeLock.lock();
                        System.out.println("thread " + threadName + " write locking");
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        writeLock.unlock();
                        System.out.println("thread " + threadName + " release write lock remain write count:" + readWriteLock.getWriteHoldCount());
                    }
                }
            }, "" + i).start();
        }
    }
}

10个线程获取读锁,10个线程获取写锁。
读写锁应用场景:

ReentrantReadWriteLock 适用于读多写少的场景,提高多线程读的效率、吞吐量。

同一线程读锁、写锁关系:

public class TestThread {

    static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    static ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    public static void main(String args[]) {
//        new TestThread().testReadWriteLock();------>1、先读锁,后写锁
//        new TestThread().testWriteReadLock();------>2、先写锁、后读锁
    }

    private void testReadWriteLock() {
        System.out.println("before read lock");
        readLock.lock();
        System.out.println("before write lock");
        writeLock.lock();
        System.out.println("after write lock");
    }

    private void testWriteReadLock() {
        System.out.println("before write lock");
        writeLock.lock();
        System.out.println("before read lock");
        readLock.lock();
        System.out.println("after read lock");
    }
}

分别打开1、2 注释,发现:

1、先获取读锁,再获取写锁,则线程在写锁处挂起。
2、先获取写锁,再获取读锁,则都能正常获取锁。
这与我们上述的理论分析一致。


回到顶部


版权声明:本文为weixin_47410172原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。