ReentrantLock独占锁加锁流程

1.前言

注:先放上一个我自己画的流程图,不着急看哈,可以先扫一眼

https://www.processon.com/view/link/61025230637689719d2f1f71

(1)什么时候用锁?

        当有多个线程需要对同一个共享变量进行操作的时候就需要考虑上锁了,但是如果说这些线程是交替执行的,例如T1执行完T2执行,T2执行完T3执行,如果线程操作是按照这种顺序执行的那其实不需要上锁,既然业务上无法保证,那就需要通过锁进行控制了,简单点说,加锁就是为了保证多线程并发下在经过某一个代码块对某一个共享变量进行操作的时候,保证串行。

(2) 实现一把自己的锁都需要什么?

        a.既然是面向对象编程那就可以先想一下,一把锁,大概都会有什么属性,肯定会有一个锁状态state来确定当前有没有锁。

        b.多个线程并发执行只有一个线程可以执行成功那么其他线程就需要进行自旋、阻塞等待、睡眠任一操作,如果是自旋,java里面提供的自旋概念就是死循环,如果高并发下N多线程做死循环肯定会影响CPU的性能,睡眠又不确定需要多长时间,那大概率需要使用park操作来阻塞线程。

        c.那线程阻塞后需要一个地方暂存它等待被唤醒,这里需要一种数据结构进行暂存get和set。

        所以实现一把简单的锁大概需要锁状态、阻塞操作、暂存结构。

(3)公平锁和非公平锁:在java中公平锁和非公平锁只有在抢锁的时候才有区分,公平锁会先看有没有其他线程排队,如果有则加入排队,如果没有抢锁,非公平锁则是直接抢锁,并不是说公平锁是排队等待唤醒,非公平锁是随机唤醒,只要进了等待队列,那就是一朝排队,永久排队。

2.ReentrantLock的基本概念

(1)概念:ReentrantLock是基于AQS进行的实现,AQS的全程AbstractQueuedSynchronizer,是阻塞式锁和同步器工具的框架,用一个state来表示锁状态,通过子类CAS去操作,内部维护了一个等待队列进行排队,使用条件变量来实现等待、唤醒,支持多个条件变量功能。

AQS有两种模式一种独占,一种共享(读写锁),今天只谈独占锁。

3.AQS的实现

(1)互斥锁主要分两种,公平锁和非公平锁的两个实现

公平锁:

final void lock() {
            acquire(1);
        }

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

非公平锁:

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

在这里大概能看出,非公平锁的时候是直接通过CAS操作将锁状态标识state由0改为1,修改成功后,将当前持有锁的线程改为自己,其实从这里也可以稍微看出来一点ReentrantLock的另外一个特性可重入性,这里的公平锁、非公平锁、可重入的特性还不明显,让我们继续往下看。需要注意一点的是,这里使用CAS操作,我理解有两个原因,第一个是因为CAS操作是一个原子操作,这里有必要解释一下,例如我们看到的代码int i = 0;看似是一条操作但是如果看它的字节码文件就会发现其实是多条

如果出现高并发,对值的修改就会出现问题,类似脏写,同时CAS还是CPU级别的指令不涉及到用户态切换到内核态。

继续阅读公平锁源码:

public final void acquire(int arg) {
        //!tryAcquire(arg)如果tryAcquire()返回的true取反为false不需要走后续逻
        //链表都不需要构建,加锁成功返回继续执行
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }



        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取当前锁状态
            int c = getState();
            //如果c == 0,则代表当前没有锁
            if (c == 0) {
                //注意这里!hasQueuedPredecessors();在这个方法里面就是公
                //平锁的体现了,为了方便阅读我直接将代码copy到下面
                //如果!hasQueuedPredecessors()返回的是fals取反则为true继续走
                //compareAndSetState通过cas操作将锁状态修改为1,有锁
                //将当前持有锁的线程修改为自己,三步都成功返回true,这里结束就可以回到
                //acquire()方法了
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //注意这里,这里是可重入锁的体现,会判断当前持有锁的线程是不是自己,如果
            //是自己会把当前锁状态+1,这里需要注意的是加了几次锁就需要减几次锁,返回true
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }


/**
*在这个方法里首先需要明确一下,看到了Node其实就可以确认,AQS里面使用的是链表来进行等待队列
*的维护的,而且还是双向链表,看一下Node节点里面的属性来证明一下。
*volatile Node prev;
*volatile Node next;
*需要解释一下这里为什么需要加上volatile,主要是为了通过禁止指令重排来保证有序性或者也可以
*是为了保证可见性,想一下说过是并发场景多个线程来构建链表,最后这个链表的头和尾到底应该指向谁?
Node节点的其他属性当解读到对应源码的时候再说
*/
public final boolean hasQueuedPredecessors() {
        //获取头尾节点进行赋值,如果当前没有线程在排队那head和tail的默认值都会为空
        Node t = tail; 
        Node h = head;
        Node s;
        //如果都为空那第一个条件就会返回fals,不会继续走第二个判断,让我们回去看tryAcquire()方法
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }




 

 公平锁无排队逻辑加锁成功,在这段逻辑里证明了三点

1.ReentrantLock的独占锁有公平锁和非公平锁两种实现

2.ReentrantLock是可重入锁

3.AQS加锁是通过锁标识状态和cas进行操作的

而且从这一部分代码中可以看出ReentrantLock速度快是有原因的

1.加锁实现完全通过java,没调用操作系统的锁

2.没有线程排队的时候连链表都不需要构建

3.逻辑运算符使用的出神入化,不该写的代码一点也没加

掌握了这些继续看看当有线程在排队的时候是如何处理的

如果有线程排队肯定会进入到

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)判断,Node.EXCLUSIVE表示节点是独占模式的等待标记,默认值为空,这个属性主要是为了区分是独占锁还是共享锁,在独占锁里面暂时没用,后续阅读读写锁源码再进行解释
private Node addWaiter(Node mode) {
        //将当前线程和是否独占标识构建出一个节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

 if (pred != null)这一步操作需要单独拿出来说,因为这里面有点东西,初一看这个判断是无用判断,因为如果没有线程排队那么队列不会构建不会走到这段代码,而如果有线程在排队那肯定是不会为空的,但是这里有一种情况,就是队列中只有一个线程T2在排队,T1获取到了锁在执行,假如说就在这一瞬间T1执行完了,T2获得了锁,等待队列为空,那么T3加在哪里?所以这一步判断是很有必要的,所以并发编程思想真的很重要。

下面就来看一下这种情况,


private Node enq(final Node node) {
        //这里为什么是个死循环?因为在这里才是真正的构建排队队列
        for (;;) {
            //在这里之前说过,如果按照现在的假设场景那么tail节点应该是null
            Node t = tail;
            //所以第一次循环会进入这个判断
            if (t == null) { // Must initialize
                //创建一个head节点
                if (compareAndSetHead(new Node()))
                    //tail指向head
                    tail = head;
            } else {
                //第二次循环的时候会进到这里,node节点是当前想要获取的线程的包装节点
                //将当前线程节点的前驱指针指向t,因为tail = head是互相指向的,所以在
                //这里相当于将head——>node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))内部addWaiter执行完毕,获得了一个排队队列,返回的是head节点继续执行acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //在这里获取pre节点,其实也就是相当于头结点
                final Node p = node.predecessor();
                //如果P和head相等证明前面没有排队的了,这里其实就是相当于我要进队列睡眠
                //之前再去抢一次锁看看能不能拿到,因为睡眠再唤醒是有性能损耗的
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

再继续解释之前需要想一个问题,一个线程睡眠了那肯定是需要唤醒继续执行的,不然这个任务不就丢了么,加入说现在等待队列里面已经存在一个线程T2正在睡眠,那当T1执行结束的时候肯定会去队列里面叫醒T2,但是如果我现在有一个T3也进来排队呢?那T3应该由T2进行唤醒,但是T2已经在排队睡眠了,我不知道你进来了啊,所以T3进来的时候需要告诉T2,哥们,我和你一起睡觉呢,你退房的时候叫我一声别把我丢在这,那继续看源码是如何处理的。

进入shouldParkAfterFailedAcquire(p,node)传递头结点和当前节点,

 
在这里需要先了解一下四种状态,翻译翻译
         /** waitStatus value to indicate thread has cancelled */
        //表示当前线程已被取消
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        //表示当前线程需要唤醒下一个节点
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //首先获取上一个节点的ws,可以理解为是否需要唤醒下一个线程的标识
        int ws = pred.waitStatus;
        //如果是-1就相当T2已经知道了我退房(解锁)的时候需要去叫醒我后面的哥们
        if (ws == Node.SIGNAL)
            return true;
        //如果是>0表示当前线程可能已经被打断取消了,所以需要继续往前面找,找到
        //一个正常的节点进行唤醒通知,那同样的唤醒的时候不需要唤醒已取消的,依次向后找到正常节点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
           //如果是其他状态条件唤醒等,直接进行设置
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

现在准备睡觉的线程已经加到了队列里面,并且已经告诉了前面线程解锁的时候叫醒它,所以下一步做的那就是睡眠了,

这个里面就不需要详细解释了,调用了native本地方法,进行线程睡眠,至此加锁流程结束。

总结一下:

ReentrantLock的特点:

1.公平和非公平:只有在抢锁的时候才会体现,只要进入等待队列那就是一朝排队永久排队,我将它理解为乱序抢锁,顺序执行。

 2.可重入性:会判断当前持有锁的线程是不是自己,如果是自己那就将锁状态+1.每次解锁-1.

 3.轻量级锁(加锁都是通过共享变量state来处理的)

注意熬,通过lock.lock的方式加的锁说是可以打断,但是其实他是一个假打断,如果想使用可打断的要用r.lockInterruptibly();

end


版权声明:本文为qq_32505833原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。