Yarn状态机框架分析

追风赶月莫停留,平芜尽处是春山。这篇文章主要讲述Yarn状态机框架分析相关的知识,希望能为你提供帮助。
1. 前言上一篇文章介绍了Yarn事件驱动模型框架分析,了解到Yarn基于生产者-消费者模式处理事件。基于GenericEventHandler#handle生产事件;通过自定义的Handler实现类消费事件。其中,在消费事件时,会导致Yarn中对象状态的变化,将对象所有状态的变化情况汇总起来就是状态机。本文将介绍Yarn状态机框架实现思路。
2. 什么是状态机状态机(State Machine),是有限状态自动机的简称,是现实事物运行规则抽象而成的一个数学模型:给定一个状态机,同时给定它的当前状态以及输入,那么输出状态时可以明确的运算出来的。例如对于自动门,给定初始状态closed,给定输入“开门”,那么下一个状态时可以运算出来的。自动门状态机基于状态转换图如下所示:

Yarn状态机框架分析

文章图片

在Yarn中,状态机中最重要的两大概念是State状态和Event事件。例如,对于一个应用RMApp而言,RMApp存在一个初始状态,处理事件时,会根据事件类型匹配对应的转换类Transition,将RMApp从初始状态转化成目标状态。RMApp经历的流程为:初始状态--> 转换方法--> 目标状态,将其所有流程汇总起来,就是状态机。
在Yarn中,App、AppAttempt、Container、Node都可以使用状态机表示。其中,RMApp:用于维护一个Application的生命周期;RMAppAttempt:用于维护一次尝试运行的生命周期;RMContainer:用于维护一个已分配的资源最小单位Container的生命周期;RMNode:用于维护一个NodeManager的生命周期。
3. 为什么要设计状态机对于任意实体,与其相关的事件可能非常多,事件对应的类型和初始状态多种多样。如果不合理的组织起来,实体的状态转换流程会负责冗杂。Yarn状态机就负责合理地组织这些状态转换流程,快速找到指定初始状态和事件类型对应的状态转换方法。
4. 何时使用状态机在Handler消费事件队列中的事件时,会使用状态机,更新其对象的状态。状态机的使用流程分为两步:
  1. 第一步:Service注册Handler。
  2. 第二步:Handler使用状态机。
4.1 Service注册Handler
以RMApp的状态机为例,Active ResourceManager服务会管理RMApp对象的生命周期,RMApp状态机负责管理RMApp的状态变化。在Active ResourceManager初始化方法ResourceManager$RMActiveServices#serviceInit中,将RMAppEventType类型的事件注册了Handler实现类ApplicationEventDispatcher,ApplicationEventDispatcher负责处理RMAppEventType类型事件:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean public class RMActiveServices extends CompositeService protected void serviceInit(Configuration configuration) throws Exception //省略 rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext)); //省略

4.2 Handler使用状态机
ApplicationEventDispatcher实现了EventHandler接口,其handler实现方法并没有真正地处理RMAppEvent事件,而是由RMApp处理:
public static final class ApplicationEventDispatcher implements EventHandler< RMAppEvent> private final RMContext rmContext; public ApplicationEventDispatcher(RMContext rmContext) this.rmContext = rmContext; @Override public void handle(RMAppEvent event) ApplicationId appID = event.getApplicationId(); //rmContext对象中包含了appId对应的RMApp对象,实际上RMApp对应的对象类型是其实现类RMAppImpl RMApp rmApp = this.rmContext.getRMApps().get(appID); //省略 rmApp.handle(event); //省略

在RMAppImpl#handle负责处理RMAppEvent事件,RMAppImpl实现了RMApp接口,而RMApp接口也继承了EventHandler接口:
public interface RMApp extends EventHandler< RMAppEvent> ...

因此RMAppImpl也是一个Handler,在RMAppImpl处理过程中,使用状态机改变RMApp对象状态:
public class RMAppImpl implements RMApp, Recoverable public void handle(RMAppEvent event) //省略 this.stateMachine.doTransition(event.getType(), event); //省略

从RMAppImpl#handle方法可以看出,RMAppImpl维护了成员变量stateMachine,stateMachine就是状态机,通过该变量处理事件,并将RMAppImpl对象的状态进行转化。
5. 状态机初始化对于要维护的对象,一般该对象对应的类负责初始化状态机。例如RMAppImpl作为一个被维护的应用对象,就包含了 StateMachineFactory成员,StateMachineFactory负责构建状态机。如下,通过StateMachineFactory#addTransition负责添加状态转换方法,StateMachineFactory#installTopology负责创建状态机:
public class RMAppImpl implements RMApp, Recoverable private static final StateMachineFactory< RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory = new StateMachineFactory< RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> (RMAppState.NEW) .addTransition(RMAppState.NEW, RMAppState.NEW,RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,RMAppState.KILLED, State.FINAL_SAVING) , RMAppEventType.RECOVER, new RMAppRecoveredTransition()) //省略 .installTopology();

StateMachineFactory负责真正存储具体的状态机,它由两个成员变量组成:transitionsListNode和stateMachineTable。transitionsListNode负责暂时存储Transition方法,最终会将transitionsListNode中的方法弹出,并放到stateMachineTable中存储。StateMachineFactory定义如下:
final public class StateMachineFactory< OPERAND, STATE extends Enum< STATE> , EVENTTYPE extends Enum< EVENTTYPE> , EVENT> private final TransitionsListNode transitionsListNode; private Map< STATE, Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > > stateMachineTable; public StateMachineFactory(STATE defaultInitialState) this.transitionsListNode = null; this.defaultInitialState = defaultInitialState; this.optimized = false; this.stateMachineTable = null;

5.1 TransitionsListNode暂存Transtion流程
TransitionsListNode是链表结构,节点存储Transition状态转换方法,转换方法类型是ApplicableTransition:
private class TransitionsListNode final ApplicableTransition< OPERAND, STATE, EVENTTYPE, EVENT> transition; final TransitionsListNode next; TransitionsListNode(ApplicableTransition< OPERAND, STATE, EVENTTYPE, EVENT> transition, TransitionsListNode next) this.transition = transition; this.next = next;

ApplicableTransition是一个接口,它的泛型有四个:OPERAND, STATE, EVENTTYPE, EVENT。其中,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。ApplicableTransition接口定义了apply方法,apply方法负责将TransitionsListNode链表中的节点取出,并放入到最终状态机映射表中。
private interface ApplicableTransition< OPERAND, STATE extends Enum< STATE> , EVENTTYPE extends Enum< EVENTTYPE> , EVENT> void apply(StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> subject);

ApplicableTransition接口具体的实现类为ApplicableSingleOrMultipleTransition,它负责储存真正的状态转换方法Transition对象,preState和eventType表示该状态方法对应的准备状态和时间类型。即:在满足preState状态和eventType类型的情况下,使用该Transition处理事件。apply方法则是将上述内容放入状态机映射表中。ApplicableSingleOrMultipleTransition定义如下:
static private class ApplicableSingleOrMultipleTransition< OPERAND, STATE extends Enum< STATE> , EVENTTYPE extends Enum< EVENTTYPE> , EVENT> implements ApplicableTransition< OPERAND, STATE, EVENTTYPE, EVENT> final STATE preState; final EVENTTYPE eventType; final Transition< OPERAND, STATE, EVENTTYPE, EVENT> transition; ApplicableSingleOrMultipleTransition (STATE preState, EVENTTYPE eventType, Transition< OPERAND, STATE, EVENTTYPE, EVENT> transition) this.preState = preState; this.eventType = eventType; this.transition = transition; @Override public void apply(StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> subject) //获取状态机中preState准备状态对应表 Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > transitionMap = subject.stateMachineTable.get(preState); if (transitionMap == null) // I use HashMap here because I would expect most EVENTTYPEs to not //apply out of a particular state, so FSM sizes would be //quadratic if I use EnumMaps here as I do at the top level. transitionMap = new HashMap< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > (); subject.stateMachineTable.put(preState, transitionMap); //将对应表中放入事件类型和处理方法 transitionMap.put(eventType, transition);

Transition接口定义真正执行状态转换的方法:
private interface Transition< OPERAND, STATE extends Enum< STATE> , EVENTTYPE extends Enum< EVENTTYPE> , EVENT> STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType);

Transition接口的实现类之一:SingleInternalArc。它表示初始状态在进行状态转换方法后,只有一种结束状态:
private class SingleInternalArc implements Transition< OPERAND, STATE, EVENTTYPE, EVENT> private STATE postState; private SingleArcTransition< OPERAND, EVENT> hook; // transition hookSingleInternalArc(STATE postState, SingleArcTransition< OPERAND, EVENT> hook) this.postState = postState; this.hook = hook; @Override public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) if (hook != null) hook.transition(operand, event); return postState;

Transition接口的实现类之二:MultipleInternalArc。它表示初始状态在进行状态转换方法后,根据Transition的执行结果返回结束状态:
private class MultipleInternalArc implements Transition< OPERAND, STATE, EVENTTYPE, EVENT> // Fields private Set< STATE> validPostStates; private MultipleArcTransition< OPERAND, EVENT, STATE> hook; // transition hookMultipleInternalArc(Set< STATE> postStates, MultipleArcTransition< OPERAND, EVENT, STATE> hook) this.validPostStates = postStates; this.hook = hook; @Override public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) throws InvalidStateTransitionException STATE postState = hook.transition(operand, event); if (!validPostStates.contains(postState)) throw new InvalidStateTransitionException(oldState, eventType); return postState;

上述Transition实现类中,分别有SingleArcTransition和MultipleArcTransition,它们内存才是真正封装转换方法的类。在StateMachineFactory#addTranstion时,传入的Transtion对象就是它们的实现类。
SingleArcTransition定义如下:
public interface SingleArcTransition< OPERAND, EVENT> //由于结束状态确定,所以返回void public void transition(OPERAND operand, EVENT event);

MultipleArcTransition定义如下:
public interface MultipleArcTransition< OPERAND, EVENT, STATE extends Enum< STATE> > //由于结束状态位置,需要根据执行结果确定结束状态 public STATE transition(OPERAND operand, EVENT event);

它们的实现类很多,例如:
Yarn状态机框架分析

文章图片

5.2 stateMachineTable状态机
stateMachineTable就是状态机,它的类型是两层Map:Map< STATE, Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > > ,外层Map的key表示旧状态,内层Map的key表示事件类型,内层Map的value是Transition< OPERAND, STATE, EVENTTYPE, EVENT> 接口,OPERAND表示操作对象,STATE表示目的状态,EVENTTYPE表示事件类型,EVENT表示事件。stateMachineTable起到的作用是:RMAppImpl可能有多种旧状态,每种旧状态可以对应多种事件类型,根据旧状态和要处理事件的类型,就能找到处理这种情形的状态转换方法和目的状态,同时状态转换方法包含对事件的处理。
5.3 StateMachineFactory构建状态机过程
在了解了StateMachineFactory的成员变量transitionsListNode和stateMachineTable后,就可以深入了解状态机的构建过程。在Handler实现类RMAppImpl中通过StateMachineFactory#addTransition注册状态转换方法。其中,对于SingleArcTransition这种唯一结束状态的实现类使用下面第一种addTransition方法注册;对于MultipleArcTransition这种不确定结束状态的实现类使用下面第二种addTransition方法注册。
5.3.1 注册SingleArcTransition实现类
public StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, STATE postState, Set< EVENTTYPE> eventTypes, SingleArcTransition< OPERAND, EVENT> hook) StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> factory = null; for (EVENTTYPE event : eventTypes) if (factory == null) factory = addTransition(preState, postState, event, hook); else factory = factory.addTransition(preState, postState, event, hook); return factory;

5.3.2 注册MultipleArcTransition实现类
public StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> addTransition(STATE preState, Set< STATE> postStates, EVENTTYPE eventType, MultipleArcTransition< OPERAND, EVENT, STATE> hook) return new StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> (this, new ApplicableSingleOrMultipleTransition< OPERAND, STATE, EVENTTYPE, EVENT> (preState, eventType, new MultipleInternalArc(postStates, hook)));

MultipleArcTransition的构造函数,向旧的StateMachineFactory的transitionsListNode成员中添加前置状态preState和事件类型共同对应的状态转换方法:
private StateMachineFactory(StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition< OPERAND, STATE, EVENTTYPE, EVENT> t) this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode); this.optimized = false; this.stateMachineTable = null;

5.3.3 构建状态机映射表StateMachineFactory#addTransition方法完成了对TransitionsListNode的构建。而TransitionsListNode只用来暂时存储状态转换方法,StateMachineFactory#installTopology才是真正负责构建状态机映射表的:
public StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> installTopology() return new StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> (this, true);

在StateMachineFactory的构造函数中,如果传递optimized参数为true,表示要通过transitionsListNode构建状态机映射表。采用StateMachineFactory#makeStateMachineTable方法:
private StateMachineFactory(StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized) this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = that.transitionsListNode; this.optimized = optimized; if (optimized) makeStateMachineTable(); else stateMachineTable = null;

StateMachineFactory#makeStateMachineTable方法将链表转为栈,弹栈并调用ApplicableTransition接口实现类的apply方法将Transition注册到状态机映射表汇总:
private void makeStateMachineTable() Stack< ApplicableTransition< OPERAND, STATE, EVENTTYPE, EVENT> > stack = new Stack< ApplicableTransition< OPERAND, STATE, EVENTTYPE, EVENT> > (); Map< STATE, Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > > prototype = new HashMap< STATE, Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > > (); prototype.put(defaultInitialState, null); // I use EnumMap here because itll be faster and denser.I would //expect most of the states to have at least one transition. stateMachineTable = new EnumMap< STATE, Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > > (prototype); for (TransitionsListNode cursor = transitionsListNode; cursor != null; cursor = cursor.next) stack.push(cursor.transition); while (!stack.isEmpty()) stack.pop().apply(this);

调用ApplicableTransition的实现类ApplicableSingleOrMultipleTransition的apply方法,注册Transition到状态机映射表中:
static private class ApplicableSingleOrMultipleTransition< OPERAND, STATE extends Enum< STATE> , EVENTTYPE extends Enum< EVENTTYPE> , EVENT> implements ApplicableTransition< OPERAND, STATE, EVENTTYPE, EVENT> final STATE preState; final EVENTTYPE eventType; final Transition< OPERAND, STATE, EVENTTYPE, EVENT> transition; ApplicableSingleOrMultipleTransition(STATE preState, EVENTTYPE eventType, Transition< OPERAND, STATE, EVENTTYPE, EVENT> transition) this.preState = preState; this.eventType = eventType; this.transition = transition; @Override public void apply(StateMachineFactory< OPERAND, STATE, EVENTTYPE, EVENT> subject) Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > transitionMap = subject.stateMachineTable.get(preState); if (transitionMap == null) // I use HashMap here because I would expect most EVENTTYPEs to not //apply out of a particular state, so FSM sizes would be //quadratic if I use EnumMaps here as I do at the top level. transitionMap = new HashMap< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > (); subject.stateMachineTable.put(preState, transitionMap); transitionMap.put(eventType, transition);

当栈中的所有元素都调用完apply方法后,状态机构建完成。后面就开始访问状态机中的状态转换方法了。
6. 使用状态机(执行状态转换)对于EventHandler而言,调用handle方法就会调用StateMachine#doTransition方法执行状态转换,例如RMAppImpl这个EventHandler就是如此:
public class RMAppImpl implements RMApp, Recoverable public void handle(RMAppEvent event) this.writeLock.lock(); try ApplicationId appID = event.getApplicationId(); final RMAppState oldState = getState(); //省略 this.stateMachine.doTransition(event.getType(), event); //省略 finally this.writeLock.unlock();

StateMachineFactory$InternalStateMachine#doTransition负责执行作为中间方法,额外增加listener处理逻辑,有点类似AOP处理。它调用StateMachineFactory#doTransition方法进行状态转换,并返回处理结果的状态:
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException listener.preTransition(operand, currentState, event); STATE oldState = currentState; currentState = StateMachineFactory.this.doTransition(operand, currentState, eventType, event); listener.postTransition(operand, oldState, currentState, event); return currentState;

StateMachineFactory#doTransition根据前置状态和事件类型找到对应的Transition实现类,即SingleInternalArc或者MultipleInternalArc,而它们分别封装了SingleArcTransition和MultipleArcTransition,其实现类就是用户注册的Transition方法。最终执行状态转换:
private STATE doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event) throws InvalidStateTransitionException Map< EVENTTYPE, Transition< OPERAND, STATE, EVENTTYPE, EVENT> > transitionMap = stateMachineTable.get(oldState); if (transitionMap != null) Transition< OPERAND, STATE, EVENTTYPE, EVENT> transition = transitionMap.get(eventType); if (transition != null) return transition.doTransition(operand, oldState, event, eventType); throw new InvalidStateTransitionException(oldState, eventType);

最终执行的就是用户注册的自定义的Transition。例如:RMAppRecoveredTransition。它在处理完事件后,返回对应的应用状态:
private static final class RMAppRecoveredTransition implements MultipleArcTransition< RMAppImpl, RMAppEvent, RMAppState> @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event; app.recover(recoverEvent.getRMState()); // The app has completed. if (app.recoveredFinalState != null) app.recoverAppAttempts(); new FinalTransition(app.recoveredFinalState).transition(app, event); return app.recoveredFinalState; if (UserGroupInformation.isSecurityEnabled()) // asynchronously renew delegation token on recovery. try app.rmContext.getDelegationTokenRenewer() .addApplicationAsyncDuringRecovery(app.getApplicationId(), BuilderUtils.parseCredentials(app.submissionContext), app.submissionContext.getCancelTokensWhenComplete(), app.getUser(), BuilderUtils.parseTokensConf(app.submissionContext)); catch (Exception e) String msg = "Failed to fetch user credentials from application:" + e .getMessage(); app.diagnostics.append(msg); LOG.error(msg, e); for (Map.Entry< ApplicationTimeoutType, Long> timeout : app.applicationTimeouts .entrySet()) app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, timeout.getKey(), timeout.getValue()); if (LOG.isDebugEnabled()) long remainingTime = timeout.getValue() - app.systemClock.getTime(); LOG.debug("Application " + app.applicationId + " is registered for timeout monitor, type=" + timeout.getKey() + " remaining timeout=" + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds"); // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) app.scheduler.handle( new AppAddedSchedulerEvent(app.user, app.submissionContext, false, app.applicationPriority, app.placementContext)); return RMAppState.SUBMITTED; // Add application to scheduler synchronously to guarantee scheduler // knows applications before AM or NM re-registers. app.scheduler.handle( new AppAddedSchedulerEvent(app.user, app.submissionContext, true, app.applicationPriority, app.placementContext)); // recover attempts app.recoverAppAttempts(); // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. return RMAppState.ACCEPTED;

7. 总结【Yarn状态机框架分析】StateMachineFactory构建了一个初始状态preState-> 事件类型eventType-> 状态转换方法Transtition的对应表,合理地组织了多种多样的状态转换方法。这个对应表就是状态机。

    推荐阅读