ReentrantReadWriteLock

ReentrantReadWriteLockReentrantLock 一样都是可重入锁,不同的是 ReentrantReadWriteLock 其实是两个锁:读锁是共享锁,写锁是独占锁,也因此常在读的吞吐高于写时使用。与 ReentrantLock 类似,ReentrantReadWriteLock 也是先实现了自己的 Sync,再衍生出公平锁与非公平锁。

Sync

我们知道 AQS 的 stateint 型,ReentrantReadWriteLock 中的 Sync 用低位 16 位表示写锁,高位 16 位表示读锁,代码中定义了一些掩码用于快速计算:

abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 持有读锁的数量,取高位 16 位 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 持有写锁的数量,取低位 16 位 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // ... }

同时由于有多个线程可获取读锁,且每个线程都是可重入的,因此需要为每个线程记录获取读锁的个数,如下:

abstract static class Sync extends AbstractQueuedSynchronizer { // ... // 用于记录哪个线程占了多少个读锁 static final class HoldCounter { int count; // initially 0 // Use id, not reference, to avoid garbage retention final long tid = LockSupport.getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } // 使用 ThreadLocal 来分线程记录持有多少读锁 private transient ThreadLocalHoldCounter readHolds; // 优化:保存上一个获取读锁的线程的 HoldCounter private transient HoldCounter cachedHoldCounter; // ... }

tryAcquire

tryAcquire 用于获取写锁,源码的注释比较清晰:

@ReservedStackAccess protected final boolean tryAcquire(int acquires) { // 流程如下: // 1. 如果读锁不为 0 或写锁不为零且持锁线程不是当前线程,失败 // 2. 如果计数器饱和了,失败 // 3. 否则通过 writerShouldBlock 检测公平性,看情况获取锁 Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 说明有线程持有锁(读锁还是写锁不确定) // 注意如果 c != 0 且 w == 0,则读锁 != 0 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 计数器饱和 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 可重入抢锁 setState(c + acquires); return true; } if (writerShouldBlock() || // 公平性检测 !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); // 和 ReentrantLock 一样,设置持锁线程 return true; }

tryRelease

tryRelease 用来释放写锁,与 ReentrantLock 中的释放锁操作几乎一样:

@ReservedStackAccess protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }

tryAcquireShared

tryAcquireShared 用于获取读锁,可以获取读锁的条件有:

  1. 没有线程持有锁
  2. 当前线程或其它线程持有读锁
  3. 当前线程持有写锁
@ReservedStackAccess protected final int tryAcquireShared(int unused) { // 流程如下: // 1. 如果有其它线程持有写锁,失败 // 2. 否则检测公平性并尝试抢锁 // 3. 抢锁失败,使用 fullTryAcquireShared 更全面地尝试 Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && // 有线程持有写锁 getExclusiveOwnerThread() != current) // 且不是当前线程 return -1; int r = sharedCount(c); if (!readerShouldBlock() && // 公平性检测 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 尝试抢锁 if (r == 0) { // 当前线程是第一个抢锁的线程,做记录 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 更新线程读锁数量,过程中优先使用缓存 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } // 抢锁失败,使用完整版抢锁逻辑 return fullTryAcquireShared(current); }

fullTryAcquireShared

fullTryAcquireShared 看起来很冗长,它与 tryAcquireShared 的逻辑几乎没啥区别,只是一个自旋版本。

final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 先检测是否有资格抢锁 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // 否则当前线程持有写锁,有资格抢写锁 } else if (readerShouldBlock()) { // 公平性原因要求当前线程阻塞,说明当前线程需要进入队列等待 // 只有当前抢锁操作为可重入操作,才认为抢锁失败 // 换句话说,可重入优先于公平性,否则容易造成死锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 计数器饱和 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 抢锁成功则更新状态 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }

tryReleaseShared

tryReleaseShared 用于释放读锁,主要功能是修改线程的读锁计数,完成后需要自旋调用 CAS 完成对 state 的修改:

@ReservedStackAccess protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) // 释放了过多的锁,属于代码逻辑错误 throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }

非公平锁:NonfairSync

对于非公平锁来说,写锁永远都不需要阻塞(因为不公平,不需要等待)。理论上读锁也一样,但是为了防止大最线程抢读锁,导致写锁饥饿死锁,于是使用保护机制:如果等待队列的队首在等待写锁,则当前抢读锁的线程选择退让。实现如下:

static final class NonfairSync extends Sync { final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } }

apparentlyFirstQueuedIsExclusive 实现如下:

final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }

公平锁:FairSync

抢锁时需要先检查是否有其它线程等待,与 ReentrantLock 一样使用 hasQueuedPredecessors 判断:

static final class FairSync extends Sync { final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }

小结

ReentrantReadWriteLock 是 JUC 中最复杂的锁实现,好在有 AQS 的帮助,整体的逻辑不复杂。

代码中的亮点有:

  • 使用 int 的高低位分别表示读写锁,变向帮助了锁降级功能的实现
  • 一些性能的优化,如 firstReader, cachedHoldCounter,低并发度时能减少很多访问次数
  • 避免写锁饿死,采取的启发式的“公平”逻辑