ReentrantReadWriteLock
ReentrantReadWriteLock
与 ReentrantLock
一样都是可重入锁,不同的是
ReentrantReadWriteLock
其实是两个锁:读锁是共享锁,写锁是独占锁,也因此常在读的吞吐高于写时使用。与 ReentrantLock
类似,ReentrantReadWriteLock
也是先实现了自己的 Sync
,再衍生出公平锁与非公平锁。
Sync
我们知道 AQS 的 state
是 int
型,ReentrantReadWriteLock
中的 Sync
用低位 16 位表示写锁,高位 16 位表示读锁,代码中定义了一些掩码用于快速计算:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 持有读锁的数量,取高位 16 位
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 持有写锁的数量,取低位 16 位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// ...
}
同时由于有多个线程可获取读锁,且每个线程都是可重入的,因此需要为每个线程记录获取读锁的个数,如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
// 用于记录哪个线程占了多少个读锁
static final class HoldCounter {
int count; // initially 0
// Use id, not reference, to avoid garbage retention
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 使用 ThreadLocal 来分线程记录持有多少读锁
private transient ThreadLocalHoldCounter readHolds;
// 优化:保存上一个获取读锁的线程的 HoldCounter
private transient HoldCounter cachedHoldCounter;
// ...
}
tryAcquire
tryAcquire
用于获取写锁,源码的注释比较清晰:
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
// 流程如下:
// 1. 如果读锁不为 0 或写锁不为零且持锁线程不是当前线程,失败
// 2. 如果计数器饱和了,失败
// 3. 否则通过 writerShouldBlock 检测公平性,看情况获取锁
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 说明有线程持有锁(读锁还是写锁不确定)
// 注意如果 c != 0 且 w == 0,则读锁 != 0
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 计数器饱和
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 可重入抢锁
setState(c + acquires);
return true;
}
if (writerShouldBlock() || // 公平性检测
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); // 和 ReentrantLock 一样,设置持锁线程
return true;
}
tryRelease
tryRelease
用来释放写锁,与 ReentrantLock
中的释放锁操作几乎一样:
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
tryAcquireShared
tryAcquireShared
用于获取读锁,可以获取读锁的条件有:
- 没有线程持有锁
- 当前线程或其它线程持有读锁
- 当前线程持有写锁
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
// 流程如下:
// 1. 如果有其它线程持有写锁,失败
// 2. 否则检测公平性并尝试抢锁
// 3. 抢锁失败,使用 fullTryAcquireShared 更全面地尝试
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && // 有线程持有写锁
getExclusiveOwnerThread() != current) // 且不是当前线程
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && // 公平性检测
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 尝试抢锁
if (r == 0) { // 当前线程是第一个抢锁的线程,做记录
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else { // 更新线程读锁数量,过程中优先使用缓存
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 抢锁失败,使用完整版抢锁逻辑
return fullTryAcquireShared(current);
}
fullTryAcquireShared
fullTryAcquireShared
看起来很冗长,它与 tryAcquireShared
的逻辑几乎没啥区别,只是一个自旋版本。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 先检测是否有资格抢锁
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 否则当前线程持有写锁,有资格抢写锁
} else if (readerShouldBlock()) { // 公平性原因要求当前线程阻塞,说明当前线程需要进入队列等待
// 只有当前抢锁操作为可重入操作,才认为抢锁失败
// 换句话说,可重入优先于公平性,否则容易造成死锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 计数器饱和
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 抢锁成功则更新状态
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
tryReleaseShared
tryReleaseShared
用于释放读锁,主要功能是修改线程的读锁计数,完成后需要自旋调用 CAS 完成对 state
的修改:
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0) // 释放了过多的锁,属于代码逻辑错误
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
非公平锁:NonfairSync
对于非公平锁来说,写锁永远都不需要阻塞(因为不公平,不需要等待)。理论上读锁也一样,但是为了防止大最线程抢读锁,导致写锁饥饿死锁,于是使用保护机制:如果等待队列的队首在等待写锁,则当前抢读锁的线程选择退让。实现如下:
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
apparentlyFirstQueuedIsExclusive
实现如下:
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
公平锁:FairSync
抢锁时需要先检查是否有其它线程等待,与 ReentrantLock
一样使用
hasQueuedPredecessors
判断:
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
小结
ReentrantReadWriteLock
是 JUC 中最复杂的锁实现,好在有 AQS 的帮助,整体的逻辑不复杂。
代码中的亮点有:
- 使用 int 的高低位分别表示读写锁,变向帮助了锁降级功能的实现
- 一些性能的优化,如
firstReader
,cachedHoldCounter
,低并发度时能减少很多访问次数 - 避免写锁饿死,采取的启发式的“公平”逻辑