Yarn Service框架分析

风流不在谈锋胜,袖手无言味最长。这篇文章主要讲述Yarn Service框架分析相关的知识,希望能为你提供帮助。
1. 前言Yarn是Hadoop最常用的资源调度组件。它的核心设计理念分别是服务化(Service)和状态机(StateMachine)。其中,Service就等价于服务化。本文将介绍Yarn Servcie框架,了解Service框架的设计思路。
2. 什么是服务化服务化是指将所有Yarn组件提供的服务统一实现Service接口,用具体的Service接口表示抽象的服务概念。如下图所示,Serive接口内部结构如下:

Yarn Service框架分析

文章图片

2.1 Service状态表示
Service接口有一个内部枚举类STATE,他表示每个Service的状态可能为4种:NOTINITED(已构建但尚未初始化)、INITED(已初始化但还没有开始或结束)、STARTED(已初始化但还没有开始或结束)、STOPPED(已初始化但还没有开始或结束),如下所示:
public enum STATE /** 已构建但尚未初始化 */ NOTINITED(0, "NOTINITED"),/** 已初始化但还没有开始或结束 */ INITED(1, "INITED"),/** 已开始,尚未结束 */ STARTED(2, "STARTED"),/** 已结束,不允许再过度到其他状态 */ STOPPED(3, "STOPPED"); // 一个int值,用来在数组查找和JXM接口。 // 虽然Enamu的ordinal()方法有这个作用,但是随着时间推移提供更多的稳定性保证 private final int value; private final String statename; // 状态枚举类的构造方法,跟前文定义的状态序号和状态名匹配 private STATE(int value, String name) this.value = https://www.songbingjia.com/android/value; this.statename = name; public int getValue() return value; @Override public String toString() return statename;

2.2 Service关键方法
每个Service都有自身的初始化、启动、停止操作,对应init()、start()、stop()方法。在接口的实现类中:
  1. init方法需要将Service状态从 NOINITED -> INITED。
  2. start方法需要将Service状态从 INITED -> STARTED。
  3. stop方法需要将Service状态从 STARTED -> STOPPED。
上述方法在状态转换过程中,如果发生异常,统一将状态转化为STOPPED。其接口定义如下:
void init(Configuration config); void start(); void stop();

2.3 服务化概念总结
Yarn提供的每个服务都有一种状态(STATE),可对服务进行初始化/启动/停止操作。
3. 为什么要服务化Yarn组件对外提供了大量的服务,这些服务来源自不同的开发者,每位开发者的编码习惯都不一样,这样不同服务提供的接口可能大不相同,Yarn管理起来会非常麻烦。如果所有的服务都继承Service接口,每个服务的状态都是NOTINITED、INITED、STARTED、STOPPED之一;且服务都是通过同样的init()、start()、stop()方法启/停,那么Yarn的服务管理会非常清晰。
4. 服务化框架分析:AbstractService为了方便开发者开发Service服务,Yarn使用抽象类AbstractService实现Service接口的init()、start()、stop()等方法。AbstractService在这些方法中完成了Service状态的转换,这样开发者之需要写具体的服务逻辑即可,不需要重复写状态转换代码。
4.1 AbstractService设计技巧
AbstractService类使用了模版方法设计模式,将状态转化的代码写到init()、start()、stop()中,AbstractService实现类只需要实现类其提供的serviceInit、serviceStart、serviceStop方法,填充服务逻辑即可,帮助开发者节省重复写状态转换代码的时间。
4.1.1 AbstractService#init设计方式init方法中实现了enterState(STATE.INITED),使服务从NOINITED -> INITED进行状态转换。init方法内部调用serviceInit方法,serviceInit方法是AbstractService实现类中服务初始化逻辑的方法。
public void init(Configuration conf) if (conf == null) throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); if (isInState(STATE.INITED)) return; synchronized (stateChangeLock) //状态转换NOINITED -> INITED if (enterState(STATE.INITED) != STATE.INITED) setConfig(conf); try //AbstractService实现类中服务初始化逻辑 serviceInit(config); if (isInState(STATE.INITED)) //if the service ended up here during init, //notify the listeners notifyListeners(); catch (Exception e) noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e);

抽象类AbstractService提供空的serviceInit的实现,如果AbstractService实现类没有重写serviceInit方法,默认会调用AbstractService#serviceInit方法。
protected void serviceInit(Configuration conf) throws Exception if (conf != config) LOG.debug("Config has been overridden during init"); setConfig(conf);

4.1.2 AbstractService#start设计方式start方法中实现了enterState(STATE.STARTED),使服务从INITED -> STARTED进行状态转换。start方法内部调用serviceStart方法,serviceStart方法是AbstractService实现类中服务启动的逻辑的方法。
public void start() if (isInState(STATE.STARTED)) return; //enter the started state synchronized (stateChangeLock) //状态转换INITED -> STARTED if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) try startTime = System.currentTimeMillis(); //AbstractService实现类中服务启动逻辑 serviceStart(); if (isInState(STATE.STARTED)) //if the service started (and isnt now in a later state), notify LOG.debug("Serviceis started", getName()); notifyListeners(); catch (Exception e) noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e);

抽象类AbstractService提供空的serviceStart的实现,如果AbstractService实现类没有重写serviceStart方法,默认会调用AbstractService#serviceStart方法。
protected void serviceStart() throws Exception

4.1.3 AbstractService#stop设计方式stop方法中实现了enterState(STATE.STOPPED),使服务从STARTED-> STOPPED进行状态转换。stop方法内部调用serviceStop方法,serviceStop方法是AbstractService实现类中服务停止的逻辑的方法。
public void stop() if (isInState(STATE.STOPPED)) return; synchronized (stateChangeLock) //状态转换STARTED -> STOPPED if (enterState(STATE.STOPPED) != STATE.STOPPED) try //AbstractService实现类中服务停止逻辑 serviceStop(); catch (Exception e) //stop-time exceptions are logged if they are the first one, noteFailure(e); throw ServiceStateException.convert(e); finally //report that the service has terminated terminationNotification.set(true); synchronized (terminationNotification) terminationNotification.notifyAll(); //notify anything listening for events notifyListeners(); else //already stopped: note it LOG.debug("Ignoring re-entrant call to stop()");

抽象类AbstractService提供空的serviceStop的实现,如果AbstractService实现类没有重写serviceStop方法,默认会调用AbstractService#serviceStop方法。
protected void serviceStop() throws Exception

4.2 AbstractService状态转换
在4.1节中提到AbstractService状态转换。init、start、stop方法中,都是通过AbstractService#enterState方法进行状态转换,本节只通过init状态转换过程为例子,研究Service状态转换过程。
4.2.1 AbstractService状态转换分析AbstractService#enterState(STATE.INITED)进入STATE.INITED状态:
//传入的STATE.INITED状态是newState,即将要转换的新状态 private STATE enterState(STATE newState) assert stateModel != null : "null state in " + name + " " + this.getClass(); //调用stateModel成员变量的enterState方法进行状态转换 STATE oldState = stateModel.enterState(newState); if (oldState != newState) LOG.debug("Service:entered state ", getName(), getServiceState()); recordLifecycleEvent(); return oldState;

stateModel是AbstractService类的成员变量,它的类型是ServiceStateModel:
public abstract class AbstractService implements Service private final ServiceStateModel stateModel;

一个服务对象包含一个ServiceStateModel成员变量,每个服务的状态由ServiceStateModel中的state成员表示。state状态流向图如下图所示:
Yarn Service框架分析

文章图片

为了防止用户随意转换服务的状态,ServiceStateModel定义了一个二维数组statemap,statemap的false表示禁止的状态转换方式;statemap的true表示允许的状态转换方式。ServiceStateModel定义如下:
Yarn Service框架分析

文章图片

横轴代表旧状态,纵轴代表新状态。他们对应的值表示能否从旧状态转化成为新状态。例如:statemap[0][1]为true,表示可以从uninited状态转化成为inited;statemap[3][1]为true,表示不能从stopped状态转化成为inited。
4.2.2 ServiceStateModel状态转换分析ServiceStateModel#enterState先检查服务能否从旧状态转换成新状态,再更新服务的状态:
public synchronized Service.STATE enterState(Service.STATE proposed) checkStateTransition(name, state, proposed); Service.STATE oldState = state; //atomic write of the new state state = proposed; return oldState;

ServiceStateModel#checkStateTransition将自身的state成员变量作为就的状态。检查服务能否从旧状态转换成新状态,如果不能转换,直接抛异常:
public static void checkStateTransition(String name, Service.STATE state, Service.STATE proposed) if (!isValidStateTransition(state, proposed)) throw new ServiceStateException(name + " cannot enter state " + proposed + " from state " + state);

ServiceStateModel#isValidStateTransition通过statemap中二维数组的值判断能否从旧状态转换成新状态。如果数组的值是true,表示可以进行转换;如果数组的值是false,表示不可以进行转换:
public static boolean isValidStateTransition(Service.STATE current, Service.STATE proposed) boolean[] row = statemap[current.getValue()]; return row[proposed.getValue()];

4.2.3状态转换总结调用AbstractService#enterState(目的状态)即可从旧状态转换为目的状态。其中,ServiceStateModel的二维数组成员变量statemap用于判断转换是否符合规范。
5. 群组服务:CompositeService
AbstractService类只能表示一个独立的Service,并不能表示所有服务。例如ResourceManager本身就是一个服务,但是它包含一系列子服务,例如管理员服务、系统指标发布服务等等。初始化ResourceManager时,需要初始化ResourceManager的所有子服务。启动和停止ResourceManager服务同理。Yarn提供了CompositeService类,用于群组服务场景。
5.1 群组服务定义CompositeService继承了AbstractService类,意味着CompositeService服务本身会调用AbstractService#init、AbstractService#start、AbstractService#stop进行服务初始化/服务启动/服务停止。CompositeService的子服务由其成员变量serviceList表示。如下所示:
public class CompositeService extends AbstractService private final List< Service> serviceList = new ArrayList< Service> ();

5.2 群组服务初始化流程由于群组服务本身就是一个服务,初始化群组服务时,会先调用AbstractService#init方法。AbstractService#init方法中,在转换了服务状态后,调用其实现类#serviceInit。对于群组服务CompositeService,会调用CompositeService#serviceInit方法。在CompositeService#serviceInit方法中,会调用每个子服务的init方法。如果子服务也继承于AbstractService,那么这些子服务初始化时,同样会调用AbstractService#init方法,进行状态转换并调用子服务实现的serviceInit执行初始化逻辑。如果子服务还包含子服务,会递归地执行初始化流程...
protected void serviceInit(Configuration conf) throws Exception //获取serviceList成员变量,即所有子服务 List< Service> services = getServices(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": initing services, size=" + services.size()); for (Service service : services) service.init(conf); //通用一个conf配置文件//CompositeService的super是AbstractService,AbstractService#serviceInit为空代码块,没什么用 super.serviceInit(conf);

5.3 群组服务启动流程与初始化流程相似,在启动群组服务时,会先调用AbstractService#start方法。AbstractService#start方法中,在转换了服务状态后,调用其实现类#serviceStart。对于群组服务CompositeService,会调用CompositeService#serviceStart方法。在CompositeService#serviceStart方法中,会调用每个子服务的start方法。如果子服务也继承于AbstractService,那么这些子服务启动时,同样会调用AbstractService#start方法,进行状态转换并调用子服务实现的serviceStart执行初始化逻辑。如果子服务还包含子服务,会递归地执行启动流程...
protected void serviceStart() throws Exception List< Service> services = getServices(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting services, size=" + services.size()); for (Service service : services) // start the service. If this fails that service // will be stopped and an exception raised service.start(); //CompositeService的super是AbstractService,AbstractService#serviceStart为空代码块,没什么用 super.serviceStart();

5.4 群组服务停止流程与启动流程相似,在停止群组服务时,会先调用AbstractService#stop方法。AbstractService#stop方法中,在转换了服务状态后,调用其实现类#serviceStop。对于群组服务CompositeService,会调用CompositeService#serviceStop方法。在CompositeService#serviceStop方法中,会调用每个子服务的stop方法。如果子服务也继承于AbstractService,那么这些子服务停止时,同样会调用AbstractService#stop方法,进行状态转换并调用子服务实现的serviceStop执行初始化逻辑。如果子服务还包含子服务,会递归地执行停止流程...
protected void serviceStop() throws Exception //stop all services that were started int numOfServicesToStop = serviceList.size(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop); stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES); //CompositeService的super是AbstractService,AbstractService#serviceStart为空代码块,没什么用 super.serviceStop(); //可以控制停止的服务数量,并且能够控制只停止已经启动的服务 private void stop(int numOfServicesStarted, boolean stopOnlyStartedServices) // stop in reverse order of start Exception firstException = null; List< Service> services = getServices(); //停止所有子Service for (int i = numOfServicesStarted - 1; i > = 0; i--) Service service = services.get(i); if (LOG.isDebugEnabled()) LOG.debug("Stopping service #" + i + ": " + service); STATE state = service.getServiceState(); //depending on the stop police //最终调用service.stop()停止service if (state == STATE.STARTED || (!stopOnlyStartedServices & & state == STATE.INITED)) Exception ex = ServiceOperations.stopQuietly(LOG, service); if (ex != null & & firstException == null) firstException = ex; //after stopping all services, rethrow the first exception raised if (firstException != null) throw ServiceStateException.convert(firstException);

5.5 群组服务总结对于包含子服务的服务来说,使用CompositeService初始化/启动/停止服务时,会初始化/启动/停止其所有子服务。
总结【Yarn Service框架分析】Yarn通过Service-> AbstractService-> CompositeService类的完善,实现了从简单到复杂的服务设计。实现了所有场景下服务的初始化/启动/停止流程。

    推荐阅读