近期一个项目接触到了并发读写,需要对Android同步机制有一定了解。
首先说下synchronized和Lock类的区别吧。
首先是存在层次,synchronized是java的关键字,而Lock是一个java的类,synchronized是jvm自动释放的,而lock必须在finally中手动释放。
synchronized不能在等待过程中响应中断,lock可以。synchronized不能知道是否获取到了锁,lock可以。lock可以提高多个线程并发读写时的访问效率。
接下来以ReentrantLock为例进行锁特性的分析。
首先是可重入锁,代表着获取到锁的线程可以再次访问。
demo代码如下:
public class Main {
static ReentrantLock lock = new ReentrantLock();
static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
public static void main(String[] args) {
executorService.execute(() -> printTest1());
}
private static void printTest1() {
lock.lock();
try {
System.out.println("printTest1");
printTest2();
} finally {
lock.unlock();
}
}
private static void printTest2() {
lock.lock();
try {
System.out.println("printTest2");
} finally {
lock.unlock();
}
}
}
默认构造的ReentrantLock 使用的是NonfairSync来获取锁:
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
如果之前线程已经获取到锁了,compareAndSetState会失败,接下来会继续调用acquire(1):
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先会调用tryAcquire来尝试获取锁:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
首先先判断当前的状态是否为0,如果不为0的话,判断当前线程是否为获取到锁的线程,在档当前线程第一次获取到所得时候,会调用setExclusiveOwnerThread来将exclusiveOwnerThread设置为当前线程。
如果相符的话,接下来会将当前状态+1后返回,因此ReentrantLock是可重入的。
但是有一种场景就是多个线程去读数据,这样的话就会造成系统吞吐量下降,用户体验下降等,ReentrantReadWriteLock就是针对于这个场景出现的:,首先依旧是一段demo代码:
public class Main {
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
if (i == 5) {
executorService.execute(new WriteRunnable());
} else {
executorService.execute(new ReadRunnable());
}
}
}
static class ReadRunnable implements Runnable {
@Override
public void run() {
lock.readLock().lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
System.out.println(Thread.currentThread().getName() + "读线程执行完成.");
}
}
static class WriteRunnable implements Runnable {
@Override
public void run() {
lock.writeLock().lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
System.out.println(Thread.currentThread().getName() + "写线程执行完成.");
}
}
}
接下来从源码角度分析如何实现读并发不冲突,写并发冲突的。
首先分析读锁:
/**
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}
注释写的非常明显,当有写锁占用时,读锁不能访问,实现该功能的核心类是Sync:
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
该类用一个4字节的变量记录当前ReentrantReadWriteLock 锁的状态,其中高16位代表读锁的状态,低16位代表写锁的状态。
acquireShared的实现如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
首先通过exclusiveCount将当前状态与0xFFFF进行与操作来判断写锁的状态,如果写锁被占用,而且占用写锁的进程还不是当前的线程的话,直接返回-1。
否则的话进行调用sharedCount函数将状态位向右移16位来进行读锁状态的判断。
接下来先调用readerShouldBlock进行队列状态判断,这里之前分析时略过了,公平锁和非公平锁的实现不同的,首先看公平锁:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
简单来说就是判断等待队列中是否有在当前线程之前的线程,有的话返回true,否则返回false。
非公平锁的readerShouldBlock如下:
return apparentlyFirstQueuedIsExclusive();
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
如果当前等待队列中的第一个线程是排他模式占用的锁则返回true,这是为了防止独占线程也就是写线程饥饿等待。
如果符合竞争策略的话,就会调用compareAndSetState(c, c + SHARED_UNIT),将状态变量的高16位加一。接下来进行一些读线程状态的记录。
如果上述过程失败会调用fullTryAcquireShared来处理CAS的遗漏,这里不做冗余分析。
如果获取读锁失败则会调用doAcquireShared将当前线程加入等待队列,并以死循环的形式获取锁,直到线程被中断:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下来分析读锁的获取:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
可以通过上述代码判断出来,同一线程,如果先获取到读锁,写锁是不能重入的,但是写锁与写锁之间是可以重入的。同样的,写锁和写锁之间的竞争,公平锁和非公平的处理策略也不同,但基本同上。
目前对并发的分析先分析这两个类,之后这篇博客会一直更新,尽可能多的补充内容。