JAVA里的锁之四读写锁

很多场景下读是多于写的,我们通过将读写锁分离可以很大程度提高性能。
ReadWriteLock

public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }

ReadWriteLock接口只定义了读锁和写锁两个方法,ReentrantReadWriteLock对其进行了实现,我们重点看下ReentrantReadWriteLock这个类。
ReentrantReadWriteLock里重要属性
/** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync;

ReentrantReadWriteLock里重要内部类
//同步器 abstract static class Sync extends AbstractQueuedSynchronizer {...} //非公平同步器 static final class NonfairSync extends Sync{...} //公平同步器 static final class FairSync extends Sync{...} //读锁,持有同步器Sync public static class ReadLock implements Lock, java.io.Serializable{ private final Sync sync; ... } //写锁,持有同步器Sync public static class WriteLock implements Lock, java.io.Serializable{ private final Sync sync; ... }

JAVA里的锁之四读写锁
文章图片

读写状态的设计
与读写状态相关联的有
  • Sync内部类里与的相关属性与方法:
static final int SHARED_SHIFT= 16; static final int SHARED_UNIT= (1 << SHARED_SHIFT); //65536 static final int MAX_COUNT= (1 << SHARED_SHIFT) - 1; //65535,对应十六进制里的0000FFFF static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //65535/** 读锁获取的次数*/ static int sharedCount(int c){ return c >>> SHARED_SHIFT; } /** 写锁获取的次数*/ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

  • AbstractQueuedSynchronizer类里的 private volatile int state;
AQS就是在一个整型变量上维护了读写两种状态,高16位表示读,低16位表示写。通过位运算可以知道一个线程获取读写锁的次数,读锁获取次数就是上面sharedCount方法计算的c >>> SHARED_SHIFT,而写锁获取次数就是exclusiveCount方法计算的c & EXCLUSIVE_MASK(将十六位抹去)。写锁状态增加1次为state+1,读状态增加1次为state+(1<<16)。当state不等于0,当写状态c & EXCLUSIVE_MASK等于0,读状态c >>> SHARED_SHIFT大于0时,表示读锁被获取。
构造方法
可以构造公平与非公平两种锁,默认为非公平。
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }

读锁获取的实现
读锁是一个支持重入的共享锁,能够同时被多个线程获取,获取读锁主要是按照共享模式来获取锁的,大体流程差不多,tryAcquireShared略有差异。
/** * private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); * private Lock readLock = readWriteLock.readLock(); * readLock.lock(); */public void lock() { sync.acquireShared(1); }public final void acquireShared(int arg) { //成功为1,失败为-1 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //其它线程获取了写锁,需要加入等待队列 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); //读锁获取次数 //当前读状态获取不需阻塞且获取次数小于65535,则CAS修改状态获取读锁 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //第一个获取读锁线程与当前线程相同,则读锁线程持有器累加 firstReaderHoldCount++; } else { //其它线程获取的读锁,使用其它线程的threadlocal累加获取次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //当前线程获取读锁需要被阻塞;获取次数超次;CAS失败 这三种情况会 //进入到fullTryAcquireShared方法重新获取一次读锁 return fullTryAcquireShared(current); }

关于readerShouldBlock()方法在公平锁与非公平锁有不同的实现,返回true(表示需要阻塞)。
  • 公平锁下需要阻塞:头节点的下一节点对应的线程不是当前线程,说明已经早有其它线程在排队了,按照FIFO的顺序,当前线程需要排队以示公平。这个可以避免饥饿。
  • 非公平锁下需要阻塞:写状态已被获取,且头节点下一节点对应的排队线程是要获取写锁,那当前线程也得排队。
读锁释放的实现
/** * private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); * private Lock readLock = readWriteLock.readLock(); * readLock.unlock(); */ public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //下面的判断是已获取读锁线程是否当前线程,结果都将获取次数递减 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) //线程没锁却去释放,需要抛异常 throw unmatchedUnlockException(); } --rh.count; } for (; ; ) { int c = getState(); int nextc = c - SHARED_UNIT; //CAS修改读数状态,只有所有读锁都释放了(为0)才返回true //返回true才会唤醒后继节点 if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }private void doReleaseShared() { for (; ; ) { Node h = head; //同步队列不为空 if (h != null && h != tail) { int ws = h.waitStatus; //判断后续节点是否需要唤醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); //唤醒 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }

好了,上面写了读锁的获取与释放,这里进行下总结:
读锁的获取:
尝试使用tryAcquireShared进行读锁获取
  • 成功则执行业务逻辑;
  • 失败:
    1. 如果其它线程获取了写锁,返回失败。
    2. 如果是公平锁且排在前面的线程是其它线程,因为不能插队,返回失败;如果是非公平锁且排在前面的线程是想获取写锁,因为写锁的排它性,也返回失败。
    3. 获取次数超过65535,返回失败。
    4. CAS设置状态没成功,返回失败。
    5. 在fullTryAcquireShared()再次获取锁,如果不成功,返回失败。
    6. 上面几步都没成功的话,在doAcquireShared还会进行获取,如果没成功,加入到等待队列里。
读锁的释放:
  1. 计数器减。
  2. CAS设置状态。
  3. 唤醒后继节点。
写锁的获取
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经获取或者是其它线程获取的写锁,则当前线程进入等待状态。
/** * private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); * private Lock writeLock = readWriteLock.writeLock(); * writeLock.lock(); */ public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero *and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only *happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if *it is either a reentrant acquire or *queue policy allows it. If so, update state *and set owner. */ Thread current = Thread.currentThread(); int c = getState(); //锁状态 int w = exclusiveCount(c); //写锁获取次数 if (c != 0) {//有线程获取了锁,可能是读锁也可能是写锁 // (Note: if c != 0 and w == 0 then shared count != 0) //有线程获取了读锁,或当前线程不是获取写锁的线程,返回失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT)//超次 throw new Error("Maximum lock count exceeded"); // Reentrant acquire 其它情形则视为获取写锁成功 setState(c + acquires); return true; } //尝试获取写锁的线程是否需要阻塞,也分公平与非公平两种实现: //公平:判断是否有前驱节点(没判断是获取读锁还是写锁),有的话自己就需要阻塞排队,返回false //非公平:总是返回false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

写锁的释放
/** * private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); * private Lock writeLock = readWriteLock.writeLock(); * writeLock.unlock(); */public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); //唤醒后继节点 return true; } return false; }protected final boolean tryRelease(int releases) { if (!isHeldExclusively())//获取写锁的线程是否是当前线程,不是抛异常 throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); //将获取写锁的线程设置为null setState(nextc); return free; }

【JAVA里的锁之四读写锁】StampedLock
https://www.cnblogs.com/admol...
https://segmentfault.com/a/11...

    推荐阅读