ReentrantLock加锁原理与源码分析

ReentrantLock加锁原理与源码分析

在实际代码开发过程中,只要用到了多线程就会存在数据的安全性问题,实现锁的方式很多;例如:synchronized关键字、Lock实现类、Redis锁,分布式锁等。本文主要讲ReentrantLock(可重入锁)的实现原理。

另外,本文只对公平锁说明,非公平锁与公平锁的区别仅在于在放入队列之前尝试通过改变state的值,如果改变成功则加锁成功,否则加锁失败。

一、ReentrantLock的加锁原理

先来一个逻辑图,ReentrantLock加锁的过程就是如此简单:
ReentrantLock加锁逻辑
1、通过lock()方法加锁,通过unlock()方法释放锁;

2、调用lock()之后,内部会判断当前锁的状态,只有在state == 0成立的情况下锁才是空闲的。

3、在空闲状态下,并不能立刻加锁成功,还要判断等待队列中有没有等待的线程。如果没有则直接加锁成功,如果有则优先从队列中获取线程加锁。

4、如果state > 0则说明当前锁被其他线程持有,持有该锁的线程在exclusiveOwnerThread属性中关联。

5、判断exclusiveOwnerThread == currentThread 是否成立;如果成立说明当前线程持有该锁,则对state累加1(通过这种方式实现锁重入);

6、如果该锁不是当前线程持有,则把该线程加入到队列中等待。

ReentrantLock加锁逻辑简单明了,下面则对实现源码分析。

二、源码解析

1、ReentrantLock构造函数和lock方法:

// 引入Sync对象,实际实现加锁的逻辑时sync
private final Sync sync;

// 默认使用的是非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}

// 可通过参数是否使用公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

public void lock() {
    sync.lock();
}

通过构造函数可知默认使用非公平锁,而实际加锁逻辑时Sync实现的。

2、Sync分析

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    
    // 加锁逻辑由Sync的子类 FairSync/NoFairSync实现公平或非公平锁
    abstract void lock();
    
    // 非公平锁实现加锁逻辑
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    // 释放锁:通过下面的逻辑可以看出,同一个线程加了几次锁就要释放几次锁,否则这个线程还将持有这把锁
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    
    // 判断当前线程是否持有该锁
    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
    
    // 触发队列中的线程获取锁
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    
    // 获取持有该锁的线程
    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }
    
    // 获取该锁持有的数量
    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }
    
    // 获取锁状态
    final boolean isLocked() {
        return getState() != 0;
    }

    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0);
    }
}

Sync中实现了除加锁之外的所有功能,因为加锁有公平和非公平,所以在FairSync中实现了公平锁,NonFairSync中实现了非公平锁。Sync的功能我都在上面的代码中做了注释,每一个都很清楚。

3、FairSync中的lock

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    // 加锁逻辑
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

从上面代码中可以看出,lock方法调用了acquire(1),acquire是AbstractQueuedSynchronizer的逻辑,如下所示:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

以上代码可以看出,实际加锁逻辑时由tryAcquire实现,在tryAcquire加锁失败之后,acquireQueued将失败的线程存放到队列中。所以主要的逻辑有两块,一个是加锁,一个是将线程加入队列。

4、FairSync的tryAcquire

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread(); // @1
    int c = getState(); // @2
    if (c == 0) {
        if (!hasQueuedPredecessors() && // @3
                compareAndSetState(0, acquires)) { // @4
            setExclusiveOwnerThread(current); // @5
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) { // @6
        int nextc = c + acquires; // @7
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc); // @8
        return true;
    }
    return false;
}

(1)获取当前线程;
(2)获取该锁的状态;
(3)如果该锁未被持有,hasQueuedPredecessors()判断队列中是否有等待线程;如果没有等待线程,compareAndSetState进行CAS加锁操作;compareAndSetState加锁成功,setExclusiveOwnerThread设置当前线程为持有线程,返回true;
(4)如果锁被持有,判断持有该锁的线程是否是当前线程;如果是当前线程,对state执行加一操作;更改锁状态,返回true。
(5)否则返回false。

5、NonfairSync源码

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

非公平锁的lock可以看出,在执行acquire之前,先执行compareAndSetState,该方法就是将state数据尝试更改,如果能更改成功则加锁成功;所以公平锁与非公平锁之间的区别就在于此。下面主要看NonfairSync的加锁实现tryAcquire,从上面代码中可以看出,tryAcquire调用了nonfairTryAcquire方法,而该方法在Sync中实现,我们再返回来分析nonfairTryAcquire。

6、NonfairSync的nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

(1)先获取当前线程;
(2)获取锁状态;
(3)如果锁未被持有,则先尝试获取锁(公平锁中是先判断队列中有没有数据),如果获取成功则设置exclusiveOwnerThread并返回true;
(4)如果已被持有,则判断是否是当前线程持有,如果是则直接更改state的值,返回true。
(5)否则返回false。

7、AbstractQueuedSynchronizer加入队列逻辑

// tryAcquire加锁失败之后,acquireQueued放入到队列中
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 创建一个Node对象,用来标记线程队列
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// 将addWaiter创建的Node对象加入到队列中
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Node就是一个双向链表,新创建的Node对象会被加入到队列的尾部。


版权声明:本文为Kelvin_hui原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。