Java并发学习笔记(ReentrantReadWriteLock(良心之作))

前言 上一篇博客介绍了ReentrantLock,可是ReentratLock是独占锁,遇到写少读多的情况,性能会不尽人意。JUC包还提供了一个 ReentrantReadWriteLock 锁,采用读写分离的方式,多个线程可以同时获取读锁。本文将从源码角度对 ReentrantReadWriteLock(以下简称读写锁) 的初始化,获取,释放等进行解释和记录。
锁的初始化

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; // 读锁对象 private final ReentrantReadWriteLock.ReadLock readerLock; // 写锁对象 private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock() { this(false); }public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }

可以看出,读写锁内部有readerLock和writerLock两个属性,分别用来表示读锁和写锁;和ReentrantLock一样,同样有一个Sync类型的对象sync用来进行锁的具体操作。在初始化时,默认是用非公平方式实现(NonfairSync和FairSync都是Sync的子类,这块和ReentrantLock类似)
下面看一下readerLock和writerLock这两个属性:
// 读锁 public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; ... protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; }// 写锁 public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; ... protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }

可见,它们都有一个Sync类型的属性sync,初始化时,会将ReentrantReadWriteLock 对象的sync赋给自己的sync属性(因为调用它们构造函数的语句为readerLock = new ReadLock(this); writerLock = new WriteLock(this); )。
public void lock() { sync.acquireShared(1); }public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

上面是ReadLock里的部分方法,可以看出,也是通过sync来实现一些功能。WriteLock同理。
读锁,写锁状态的获取和更新 读写锁有读锁和写锁两个状态,但是读写锁并没有定义什么新变量,而是沿用AQS里的state变量,用变量的高16位表示读状态,低16位表示写状态,之所以可以这么做,是因为一个int变量是4字节,32位。下面是进行读,写状态获取的代码:
// 定义了一些常量,用于读状态和写状态的操作 static final int SHARED_SHIFT= 16; static final int SHARED_UNIT= (1 << SHARED_SHIFT); static final int MAX_COUNT= (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count. */ static int sharedCount(int c){ return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count. */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

首先看读状态,代码为return c >>> SHARED_SHIFT; ,这个SHARED_SHIFT是16,也就是把c右移16位,把一个32位变量右移16位,也就是读到了此变量的高16位。
// 释放读锁的部分代码 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) ... // 获取读锁的部分代码 compareAndSetState(c, c + SHARED_UNIT))

以上是对读锁进行获取和释放的部分代码,在这部分,对读锁状态的更新主要通过对 SHARED_UNIT 这个变量的加减来操作。这个变量的值为 (1 << SHARED_SHIFT) 也就是10000000000000000(二进制,1后面跟16个0),可以看出每次加减这个 SHARED_UNIT ,对低16位完全没有影响。
接着看写锁。获取写锁状态的代码为return c & EXCLUSIVE_MASK; ,这个 EXCLUSIVE_MASK 的值为 (1 << SHARED_SHIFT) - 1; 也就是 1111111111111111(二进制,16个1),高16位全部补为0,和一个32位的变量进行&操作,得到的数高16位自然全是0,低16位不变,原来是0还是0,原来是1还是1。
// 释放写锁的部分代码 int nextc = getState() - releases; setState(nextc); ... // 释放写锁的部分代码 setState(c + acquires);

可见,写锁的状态直接进行加减操作,毕竟它操作的是低16位。
但是,这样子也有一定风险,毕竟16位表示的数字有限,所以后面经常出现这种代码:
if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); ... if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded");

MAX_COUNT 值为 (1 << SHARED_SHIFT) - 1,也就是 1111111111111111(二进制,16个1),这是16位表示的极限了。所以超过时,会抛出异常。
不得不说,这块操作还是很巧妙的。如果我自己实现,肯定是重新定义两个属性,一个表示读,一个表示写。人家这还是用state变量,用这种巧妙的操作让一个变量表示两个状态,这样子还能继续用关于state的函数,确实很厉害。而且MAX_COUNT十进制是65536,感觉应该不会有什么程序锁的计数能达到这么多吧。。。
写锁 写锁的获取
注:关于写锁的获取和释放不考虑lockInterruptibly()public boolean tryLock(long timeout, TimeUnit unit)这些,只考虑最基本情况。那些其它情况有空会单独开一篇总结,读锁同理
public void lock() { sync.acquireShared(1); } // 位于AQS中 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) acquire(null, arg, true, false, false, 0L); } // 位于Sync中 protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

上面的调用关系很简单,和ReentrantLock类似,就不介绍了。主要看tryAcquire( )函数。首先是if (w == 0 || current != getExclusiveOwnerThread()),此时返回false。注意了,这是在if (c != 0)的前提下的,c!=0,w却等于0,说明此时有线程持有读锁,那么不能获取;或者虽然w != 0,但是写锁的持有线程不是本线程,还是不能获取。接下来if (w + exclusiveCount(acquires) > MAX_COUNT),就是判断会不会超过表示范围;如果以上分支都不满足,那么就说明这个锁正被当前线程持有着写锁,并且不会溢出,那么直接设置state即可,由于是重入获取锁,也不需要设置 ExclusiveOwnerThread 。而且此时调用的是setState( ),因为这种情况下,锁被本线程持有,不存在多线程竞争的情况,不需要CAS操作。
如果c不是0,说明锁没被任何线程持有。那么首先调用writerShouldBlock()查看本线程是否需要阻塞,接着再CAS方式设置锁的状态,如果成功,再调用 setExclusiveOwnerThread(current); 来设置锁的持有者。这里之所以用CAS操作,是因为此时锁没被任何线程持有,故需要竞争。
看一下writerShouldBlock()
// 非公平锁 final boolean writerShouldBlock() { return false; // writers can always barge } // 公平锁 final boolean writerShouldBlock() { return hasQueuedPredecessors(); }

非公平锁直接返回false,公平锁调用AQS里的hasQueuedPredecessors(); 判断当前线程是否有前驱节点。这是一个逻辑表达式的短路操作。如果是非公平锁,if (writerShouldBlock() || !compareAndSetState(c, c + acquires))前一个条件返回false,那么要进行第二个条件的判断,尝试CAS设置锁;如果是公平锁,调用hasQueuedPredecessors(); 如果返回true,由于是||操作,后一个条件不用判断了,这个逻辑表达式直接返回true,否则,才会走下一个条件。如果把||改为|就不行了。这个功能如果让我写,我肯定是if if if,源代码的作者巧妙利用短路的操作,精简了代码,水平确实高啊。
public boolean tryLock() { return sync.tryWriteLock(); } ... final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }

还有tryLock( )。和普通的lock( )类似。不同在于:
  • 当c=0时,并没有调用writerShouldBlock()函数,直接进行了CAS设置锁的状态
  • 调用之后直接返回true or false,不会进入阻塞队列
写锁的释放
public void unlock() { sync.release(1); } // 位于AQS中 public final boolean release(int arg) { if (tryRelease(arg)) { signalNext(head); return true; } return false; } // 位于Sync中 protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }

这块和ReentrantLock类似。在tryRelease里先是查看当前线程是否真正持有锁,如果都没有持有,那还释放个啥。接着用free = exclusiveCount(nextc) == 0; 来表示锁是不是释放干净了,如果是,在AQS里会signalNext(head); 唤起下一个线程。
总的来说,写锁这部分和ReentrantLock类似,没什么太难的地方。
读锁 读锁的获取
尝试获取
public void lock() { sync.acquireShared(1); } // 位于AQS中 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) acquire(null, arg, true, false, false, 0L); } // 位于Sync中 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 本线程是让读锁从0到1的线程 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; // 本线程重入方式获取,并且是第一个获取的线程 } else if (firstReader == current) { firstReaderHoldCount++; // 本线程不是第一个获取读锁的线程 } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }

调用关系比较简单,不用说了。主要关注tryAcquireShared函数。首先是进行逻辑判断if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) 如果有其它线程持有写锁,那么返回 -1。自己持有写锁是没问题的,可以往下走。
接下来又是一个短路操作:if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)),当!readerShouldBlock()r < MAX_COUNT均为true,就会进行第三个判断,也就是CAS设置锁。当三个条件都为真,那么意味着锁设置成功了,会执行代码块里那段看起来不明觉厉的代码。当然三个条件都满足还是挺难的,所以如果这个逻辑表达式不成立,会调用fullTryAcquireShared(current)进行进一步获取。可见,tryAcquireShared只是进行一次尝试。
接下来看逻辑表达式为真的那一段代码。
private transient Thread firstReader; private transient int firstReaderHoldCount; static final class HoldCounter { int count; // initially 0 // Use id, not reference, to avoid garbage retention final long tid = LockSupport.getThreadId(Thread.currentThread()); }static final class ThreadLocalHoldCounter extends ThreadLocal { public HoldCounter initialValue() { return new HoldCounter(); } }private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter;

这是那段代码中出现的变量的定义。fistReader是用来记录第一个获取到读锁的线程,fitstReaderHoldCount是记录此线程的持有数(ReentrantReadWriteLock也是可重入的);readHolds是ThreadLocalHoldCounter的对象,而ThreadLocalHoldCounter是ThreadLocal的子类。这个ThreadLocal里装的是HoldCounter类的对象,这个HoldCounter类里分别是持有数量和持有线程的id。看起来真是挺晕的(((φ(◎ロ◎; )φ)))。
接下来分析这段代码,先是r=0分支,此时,本线程是第一个让此读锁计数从0到1的线程,所以进行firstReader和firstReaderHoldCount的设置;否则,如果这个读锁的第一个持有的线程就是本线程,那么直接++firstReaderCount即可,也很合理。这两个地方的代码也没有进行同步处理,因为r是之前的读锁值,在进入r=0分支时,CAS设置读锁状态已经成功,所以其它线程再进来读也肯定到不了r=0这个分支了;对于else if (firstReader == current) 分支,肯定也只有本线程=firstReader时,才能触发,这两个分支不存在和其它线程的竞争。
如果以上两个条件都不满足,那么这个线程就是第二个及以后获得读锁的线程。这个时候,这个线程的读锁计数就由它自己维护了。这个分支里的代码就是对此线程的读锁计数进行一番操作。首先是HoldCounter rh = cachedHoldCounter; ,有些书上说cachedHoldCounter是记录最后一个获取读锁的线程。我感觉也未必吧,毕竟这个变量也不是volatile的,无法保证可见性,你读到的未必就是真正最后一个获取的。先进行if (rh == null || rh.tid != LockSupport.getThreadId(current))判断,如果不满足,也就是从cachedHoldCounter获取到的rh正好就是本线程的;如果rh不是本线程的,经过 cachedHoldCounter = rh = readHolds.get(); 设置之后,rh也成为了本线程的HoldCounter变量。
之后是else if (rh.count == 0)分支,如果可以进入这个分支,也就意味着cachedHoldCounter保存的HoldCounter对象确实是本线程的,但是对象里对应的count却为0。那么为什么会出现这种情况呢?因为读锁的释放过程并没有清除cachedHoldCounter的代码。所以是cachedHoldCounter对应的线程之前的读锁被释放过一次,这个线程又再次来获取读锁,所以把这个本来就属于它的HoldCounter变量再赋给它。
总之不管怎样,当执行到rh.count++; 这条语句时,rh对应的一定是本线程的HoldCount对象。把它的计数自增一个。
这块代码我看的时候属实难受啊,看了很久才看明白。其实没有firstReader,firstReaderHoldCount,cachedHoldCounter 也不是不行。反正HoldCounter是ThreadLocal的,每个线程都有,从自己线程读也可以。但是可能那样读取效率有些低,所以这里设置了一点相当于缓存的变量,如果这些变量命中了,就不需要去自己线程读了。你看它命名也能看出来:cachedHoldCounter
完全获取
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (; ; ) { int c = getState(); // 检查写锁是否被持有 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. // 检查此线程是否已获得过读锁 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { // 尝试先以cachedHoldCounter方式获取线程的HolderCounter对象 rh = cachedHoldCounter; // 如果cachedHoldCounter没有获取到,再从ThreadLocal里获取 if (rh == null || rh.tid != LockSupport.getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 进行CAS设置读锁计数 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }

代码首先检查此锁的写锁是否被持有。从代码可以看出来,当写锁被本线程持有时,是可以再获取读锁的;如果是其它线程持有写锁,则返回-1。
接下来进入到else if (readerShouldBlock()) {分支。进入此分支说明写锁没有被其它线程持有,但是这个线程获取读锁需要被阻塞。
// 非公平锁实现 final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } // 公平锁实现 final boolean readerShouldBlock() { return hasQueuedPredecessors(); }

这都是AQS类里定义的函数,这里不细说了。那为啥这种情况还有一堆代码呢?为啥不直接返回-1?
if (rh.count == 0) return -1;

这块的关键代码其实在这。这部分是判断本线程是否已经持有了读锁,从源代码来看,Java的开发者认为如果是重入方式获取读锁,即使readerShouldBlock()为真,也可以去下一部分获取。如果if (firstReader == current)为真,那肯定是重入获取的,可以进行下一步;否则又是用 cachedHoldCounter 来尝试命中缓存,没有命中,就从自己线程本地读取 HoldCounter 对象,这块之前已经解释了。
如果这一段代码都没有return,那么说明这个线程可以允许获取读锁,于是进行CAS操作来设置读锁的状态。如果可以进入到if (compareAndSetState(c, c + SHARED_UNIT)) 分支,说明已经获取成功了,和尝试获取类似,把线程对应 的HoldCounter 的计数自增一个。否则,注意到整个代码包在一个for (; ; )里,线程会不断尝试CAS操作。
读锁的获取还有一个tryReadLock(),就是不断循环获取,代码基本一样。
public boolean tryLock() { return sync.tryReadLock(); } // 位于Sync中 final boolean tryReadLock() { Thread current = Thread.currentThread(); for (; ; ) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }

读锁的释放
public void unlock() { sync.releaseShared(1); } // 位于AQS中 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { signalNext(head); return true; } return false; } 位于Sync中 protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (; ; ) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }

首先是if (firstReader == current) 分支,进入到这个分支说明本线程是第一个获得读锁的线程,直接操作firstReader和firstReaderHoldCounter即可。
if (firstReader == current) 对应的else分支又进行了一番操作,其实就是把线程对应的读锁计数-1。在此过程中,如果发现if (count <= 1),说明,这个锁计数即将为0,需要释放,所以进行了readHolds.remove(); 操作,把线程的HoldCounter对象清除。如果发现if (count <= 0),此时根本没有可释放的东西,抛出了异常。
之后是循环CAS设置读锁状态。
其它方法 到此为止,读写锁里比较困难的代码已经解释完了,还有一些很简单的方法诸如以下:
public final boolean isFair() { return sync instanceof FairSync; } ... protected Thread getOwner() { return sync.getOwner(); } // other methods

这些都很简单,没什么可说的。
总结 【Java并发学习笔记(ReentrantReadWriteLock(良心之作))】本文对 ReentrantReadWriteLock 进行了源码级别的学习和记录,感觉 ReentrantReadWriteLock 主要的难点在于读锁的获取和释放。这是个人学习过程中的心得和体会,如果有理解的不全面的地方,欢迎评论区交流。

    推荐阅读