当筵意气临九霄,星离雨散不终朝。这篇文章主要讲述#yyds干货盘点# Phaser详解相关的知识,希望能为你提供帮助。
Phaser详解
Phaser源码详解
?核心参数
private volatile long state;
/**
* The parent of this phaser, or null if none
*/
private final Phaser parent;
/**
* The root of phaser tree. Equals this if not in a tree.
*/
private final Phaser root;
//等待线程的栈顶元素,根据phase取模定义为一个奇数header和一个偶数header
private final AtomicReference< QNode> evenQ;
private final AtomicReference< QNode> oddQ;
state状态说明:
Phaser使用一个long型state值来标识内部状态:
低0-15位表示未到达parties数;
中16-31位表示等待的parties数;
中32-62位表示phase当前代;
高63位表示当前phaser的终止状态。
注意: 子Phaser的phase在没有被真正使用之前,允许滞后于它的root节点。这里在后面源码分析的reconcileState方法里会讲解。Qnode是Phaser定义的内部等待队列,用于在阻塞时记录等待线程及相关信息。实现了ForkJoinPool的一个内部接口ManagedBlocker,上面已经说过,Phaser也可能被ForkJoinPool中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务(通过内部实现方法isReleasable和block)。
函数列表
//构造方法
public Phaser()
this(null, 0);
public Phaser(int parties)
this(null, parties);
public Phaser(Phaser parent)
this(parent, 0);
public Phaser(Phaser parent, int parties)
//注册一个新的party
public int register()
//批量注册
public int bulkRegister(int parties)
//使当前线程到达phaser,不等待其他任务到达。返回arrival phase number
public int arrive()
//使当前线程到达phaser并撤销注册,返回arrival phase number
public int arriveAndDeregister()
/*
* 使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。
* 如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。
* 如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。
*/
public int arriveAndAwaitAdvance()
//等待给定phase数,返回下一个 arrival phase number
public int awaitAdvance(int phase)
//阻塞等待,直到phase前进到下一代,返回下一代的phase number
public int awaitAdvance(int phase)
//响应中断版awaitAdvance
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException
//使当前phaser进入终止状态,已注册的parties不受影响,如果是分层结构,则终止所有phaser
public void forceTermination()
方法 - register()
//注册一个新的party
public int register()
return doRegister(1);
private int doRegister(int registrations)
// adjustment to state
long adjust = ((long)registrations < < PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (; ; )
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts > > > PARTIES_SHIFT; //获取已注册parties数
int unarrived = counts & UNARRIVED_MASK; //未到达数
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s > > > PHASE_SHIFT); //获取当前代
if (phase < 0)
break;
if (counts != EMPTY)// not 1st registration
if (parent == null || reconcileState() == s)
if (unarrived == 0)// wait out advance
root.internalAwaitAdvance(phase, null); //等待其他任务到达
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))//更新注册的parties数
break;
else if (parent == null)// 1st root registration
long next = ((long)phase < < PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))//更新phase
break;
else
//分层结构,子phaser首次注册用父节点管理
synchronized (this)// 1st sub registration
if (state == s)// recheck under lock
phase = parent.doRegister(1); //分层结构,使用父节点注册
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
//由于在同一个事务里,即使phaser已终止,也会完成注册
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase < < PHASE_SHIFT) | adjust)) //更新phase
s = state;
phase = (int)(root.state > > > PHASE_SHIFT);
// assert (int)s == EMPTY;
break;
return phase;
【#yyds干货盘点# Phaser详解】说明:register方法为phaser添加一个新的party,如果onAdvance正在运行,那么这个方法会等待它运行结束再返回结果。如果当前phaser有父节点,并且当前phaser上没有已注册的party,那么就会交给父节点注册。
register和bulkRegister都由doRegister实现,大概流程如下:
如果当前操作不是首次注册,那么直接在当前phaser上更新注册parties数
如果是首次注册,并且当前phaser没有父节点,说明是root节点注册,直接更新phase
如果当前操作是首次注册,并且当前phaser由父节点,则注册操作交由父节点,并更新当前phaser的phase
上面说过,子Phaser的phase在没有被真正使用之前,允许滞后于它的root节点。非首次注册时,如果Phaser有父节点,则调用reconcileState()方法解决root节点的phase延迟传递问题, 源码如下:
private long reconcileState()
final Phaser root = this.root;
long s = state;
if (root != this)
int phase, p;
// CAS to root phase with current parties, tripping unarrived
while ((phase = (int)(root.state > > > PHASE_SHIFT)) !=
(int)(s > > > PHASE_SHIFT) & &
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase < < PHASE_SHIFT) |
((phase < 0) ? (s & COUNTS_MASK) :
(((p = (int)s > > > PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
s = state;
return s;
推荐阅读
- nginx实现反向代理
- appache优化
- 好的测试数据管理,到底要怎么做()
- 卡牌大师(玩转“洗牌算法”,幸运女神在微笑 (*^_^*))
- Java8 判空新写法!
- 技术分享 | 黑盒测试方法论—边界值
- Kubernetes出于什么考虑,放弃DNS轮询,而依赖代理模式将入站流量转发到后端呢()
- #yyds干货盘点# 解决华为机试(求最小公倍数)
- pg快速入门--初步使用