Java AbstractQueuedSynchronizer源码分析

AbstractQueuedSynchronizer相关数据结构和领域概念 AbstractQueuedSynchronizer是JUC中提供的一种同步队列的机制,可以说是JUC中实现锁和其它的同步机制的基础类,它表达了锁的抽象,本质上就是维护了一个状态变量state和一个同步队列,这是一个CLH队列(三个人的人名),这个队列表示获取锁的线程的队列,它是一种先入先出队列,这个队列有以下特点:

  • 队列的头代表已经持有锁的线程
  • 队列有head和tail两个指针,分别指向队头和队尾,如果队列为空,即还没有线程正在持锁和等待,这两个指针都指向null,如果只有头,即只有一个线程持有锁,无等待锁的线程,则头和尾都指向一个节点。
  • 队列的节点数据是线程,队头的数据为null,其它节点的数据即正在等待锁的线程。
这个队列的节点的两个表达领域特性的重要属性:
  • waitStatus:表示节点的等待状态,初始值为0,为正是表示该线程被取消,说明下一个节点不需要通知,表示取消状态,所以大部分情况下只需要判断符号。
  • nextWaiter:主要有两种用途,通过这个字段表示条件队列的下一个节点,或者通过固定节点表示是共享还是互斥节点,共享节点固定是SHARED,互斥节点固定是NULL
waitStatus有以下几个取值:
  • SIGNAL:表示后继节点需要被通知,一般如果需要通知后续节的线程,要么把状态置为这个,要么直接唤醒后继节点的线程。
  • CANCELLED:表示一个线程在等待足够长时间后,被中断了从而取消对锁的获取,会将状态置为这个。
  • CONDITION:主要是用于条件队列。
  • PROPAGATE:表示需要传播,这个状态的节点,会将状态置为SIGNAL,从而会唤醒后继节点的线程。
  • 0:初始值
主要实现 AQS作为抽象的同步队列实现,在设计模式上,主要是用模板方法模式,主要是提供了众多的tryXXX方法供具体的子类实现其加锁和解锁逻辑,公用的acquireXXX和releaseXXX方法作为模板方法的编排,规范获取/释放锁的行为,而且都是final方法。
AQS供子类扩展的protected方法:
//尝试以互斥方法获取锁 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }//尝试释放互斥锁 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }//尝试以共享方式获取锁 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }//尝试释放共享锁 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }//当前线程是否获取互斥锁 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }

AQS提供的公用方法主要是分为两大类:互斥和共享相关的方法,获取互斥锁的方法有acquire、acquireInterruptibly、tryAcquireNanos和release,分别是获取互斥锁、考虑可中断方式和超时的实现,以及释放。共享有acquireShared、acquireSharedInterruptibly、tryAcquireSharedNanos和releaseShared,也分别对应共享锁的获取、考虑中断和超时的实现以及释放共享锁。
AQS的实现其实是围绕state变量和等待队列来展开的,如下面这个图所描述的那样:
Java AbstractQueuedSynchronizer源码分析
文章图片

AQS中是通过一个int型的变量state去维护同步状态的,子类的实现是通过getState、setState和compareAndSetState来获取和修改状态,从而实现对锁的获取和释放。等待列队上的节点代表在竞争锁而并未持有锁的线程,它们中的线程处于自旋或者挂起状态,如果等待时间过长,实际上会进入挂起状态,有一个特殊的队列是条件队列,队列中线程都是因为某些条件而进行等待,条件队列是通过next连接起来的,它们被唤醒之后,又会转移到等待队列中去参与锁的竞争,下面会分别分析互斥、共享和条件对象的实现。
互斥锁的实现
这里我只分析互斥锁的主体的实现逻辑,其它的细节主要是涉及可中断、超时等机制,主体的流程是相通的,在具体查看源码之前,可通过下面这张图了解获取互斥型锁的实现:
Java AbstractQueuedSynchronizer源码分析
文章图片

再从acquire方法入手,通过源码去理解:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

通过tryAcquire尝试获取锁,若不成功,则先通过addWaiter,将当前线程加入到等待列队,然后调用acquireQueued方法,这是获取互斥锁的关键流程实现:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (; ; ) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

这里可以看到,只有队头,即head的后继节点,才会尝试获取锁,若成功,当前节点需要成为head指向的节点,这样等待队列的节点随着获取成功,不断往head移动,若不成功,则会调用shouldParkAfterFailedAcquire方法决定要不要将当前线程挂起,这个方法的逻辑其实也非常简单,如果前面节点的等待状态为SIGNAL,则放心地挂起,因为后面释放锁的时候,一定会唤醒当前线程,若不是,则会剔除被取消的节点,然后将前面节点的等待状态置为SIGNAL,当前线程也会变成可挂起状态而被挂起,所以,等待队列上的线程,最终都会被挂起,这个线程在当前锁被释放时,会被唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE.Indicate that we * need a signal, but don't park yet.Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

互斥锁的释放逻辑就相对简单了,先看下面的图:
Java AbstractQueuedSynchronizer源码分析
文章图片

调用尝试释放的方法,若成功,则唤醒等待队列的第一个线程,在上面获取锁的流程中,我们知道,等待队列的线程会被挂起,就是在这儿被唤醒的。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }...//若存在后继节点,则唤醒上面的线程 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling.It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node.But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

我们可以发现,释放后,也只唤醒后面的一个线程,这样会避免多个线程带来的羊群效应,并且对于公平的实现有极大的便利。
共享锁的实现
同样的,我们也以共享方式的主体流程为例进行分析,对于可中断和超时机制只是有了一些额的处理而已,共享方式获取流程如下图:
Java AbstractQueuedSynchronizer源码分析
文章图片

对源码的解读从acquireShared方法展开:
//以共享方式获取 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }...//共享获取的主体流程private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (; ; ) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }//设置头节点且传播 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }

首先调用tryAcquireShared方法,尝试共享方式获取,若不成功,则进入获取共享锁的主体流程doAcquireShared,这里和前面的互斥锁的流程比较相似,只是对于共享锁,需要考虑传播的问题,会调用setHeadAndPropagate,若等待队列的头节点尝试获取共享锁成功,则将当前线程的节点设置为head,对于后继节点是共享节点时,会调用doReleaseShared方法,这个方法是释放共享锁的主要逻辑,这里会考虑传播的问题,稍会我们介绍释放共享锁的流程。
释放共享锁的流程如下图描述:
Java AbstractQueuedSynchronizer源码分析
文章图片

共享释放是在releaseShared方法中实现的:
//获取共享锁 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }//释放共享锁的主要流程 private void doReleaseShared() { for (; ; ) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head)// loop if head changed break; } }

我们注意到,释放共享锁和互斥的区别是,这里当waitStatus为0时,会设置为PROPAGATE状态,在shouldParkAfterFailedAcquire中,对于waitStatus<0时,会设置为SIGNAL状态,而这个状态的节点,一定会唤醒后继节点的线程,这个就是传播的实现。
条件对象实现
条件对象是配合AQS一起使用的,表示在某些条件下需要等待,这里不是等待持有锁,而在满足某些条件的情况下,还需要阻塞,等待其它线程通知它,等待条件的线程都放在一个条件队列上,这个是和同步队列不同的,同步队列是等待锁。
条件对象实现了Conditionr接口,其实就是条件相关的一些语义:
void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll();

这么多方法,其实就是分为两类,等待(await)表示在某些条件下必须等待,通知(signal)通知其它线程可以退出等待,awaitXXX方法的变种就是实现了超时、可中断相关的逻辑,我们先从典型的await方法分析等待的流程:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

调用await后,首先在条件队列中添加一个节点,并且释放当前持有的锁,使用过条件变量的时候,我们应该知道,在等待之前,当前线程其实是已经获取锁的,后面其实就会通过挂起或者自旋,当前线程被阻塞了,我们简化一个这个模型,等待就是将当前线程加入一个条件队列,条件队列中的节点可以向同步队列转移,有两个种情况下会发生转移:
  • 取消了等待,取消发生的原因有:1 因为超时,线程取消了等待,2 线程被其它线程给中断了,也要取消等待,会调用transferAfterCancelledWait方法。
  • 被其它线程通知了,调用transferForSignal方法。
我们可以看检查中断的方法checkInterruptWhileWaiting,这里会检查取消的条件之一,有没有被其它线程中断,若被中断,则会调用transferAfterCancelledWait,将当前节点waitStatus变为0,且将节点转移到同步队列。
//在等待时检查中断状态 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }//取消等待后转移 final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq().Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }

再来看signal方法,这里的逻辑比较单纯,就是会通知第一个节点,也就是用调用transferForSignal方法转移条件队列节点到同步队列,这里就对应我们上面说到转移的另一种情况,通知的时候。看了这两种情况,我们知道,条件队列的节点,会向同步队列转移,之后也会去参与锁的竞争。
//通知一个线程退出条件等待 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }//通知第一个节点 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }//通知后的节点转移 final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

【Java AbstractQueuedSynchronizer源码分析】另一种通知的情况signalAll方法就是通知所有的条件队列中的线程,退出等待,将节点转移到同步队列中去参与锁的竞争。

    推荐阅读