风流不在谈锋胜,袖手无言味最长。这篇文章主要讲述Yarn Service框架分析相关的知识,希望能为你提供帮助。
1. 前言Yarn是Hadoop最常用的资源调度组件。它的核心设计理念分别是服务化(Service)和状态机(StateMachine)。其中,Service就等价于服务化。本文将介绍Yarn Servcie框架,了解Service框架的设计思路。
2. 什么是服务化服务化是指将所有Yarn组件提供的服务统一实现Service接口,用具体的Service接口表示抽象的服务概念。如下图所示,Serive接口内部结构如下:
文章图片
2.1 Service状态表示
Service接口有一个内部枚举类STATE,他表示每个Service的状态可能为4种:NOTINITED(已构建但尚未初始化)、INITED(已初始化但还没有开始或结束)、STARTED(已初始化但还没有开始或结束)、STOPPED(已初始化但还没有开始或结束),如下所示:
public enum STATE
/** 已构建但尚未初始化 */
NOTINITED(0, "NOTINITED"),/** 已初始化但还没有开始或结束 */
INITED(1, "INITED"),/** 已开始,尚未结束 */
STARTED(2, "STARTED"),/** 已结束,不允许再过度到其他状态 */
STOPPED(3, "STOPPED");
// 一个int值,用来在数组查找和JXM接口。
// 虽然Enamu的ordinal()方法有这个作用,但是随着时间推移提供更多的稳定性保证
private final int value;
private final String statename;
// 状态枚举类的构造方法,跟前文定义的状态序号和状态名匹配
private STATE(int value, String name)
this.value = https://www.songbingjia.com/android/value;
this.statename = name;
public int getValue()
return value;
@Override
public String toString()
return statename;
2.2 Service关键方法
每个Service都有自身的初始化、启动、停止操作,对应init()、start()、stop()方法。在接口的实现类中:
- init方法需要将Service状态从 NOINITED -> INITED。
- start方法需要将Service状态从 INITED -> STARTED。
- stop方法需要将Service状态从 STARTED -> STOPPED。
void init(Configuration config);
void start();
void stop();
2.3 服务化概念总结
Yarn提供的每个服务都有一种状态(STATE),可对服务进行初始化/启动/停止操作。
3. 为什么要服务化Yarn组件对外提供了大量的服务,这些服务来源自不同的开发者,每位开发者的编码习惯都不一样,这样不同服务提供的接口可能大不相同,Yarn管理起来会非常麻烦。如果所有的服务都继承Service接口,每个服务的状态都是NOTINITED、INITED、STARTED、STOPPED之一;且服务都是通过同样的init()、start()、stop()方法启/停,那么Yarn的服务管理会非常清晰。
4. 服务化框架分析:AbstractService为了方便开发者开发Service服务,Yarn使用抽象类AbstractService实现Service接口的init()、start()、stop()等方法。AbstractService在这些方法中完成了Service状态的转换,这样开发者之需要写具体的服务逻辑即可,不需要重复写状态转换代码。
4.1 AbstractService设计技巧
AbstractService类使用了模版方法设计模式,将状态转化的代码写到init()、start()、stop()中,AbstractService实现类只需要实现类其提供的serviceInit、serviceStart、serviceStop方法,填充服务逻辑即可,帮助开发者节省重复写状态转换代码的时间。
4.1.1 AbstractService#init设计方式init方法中实现了enterState(STATE.INITED),使服务从NOINITED -> INITED进行状态转换。init方法内部调用serviceInit方法,serviceInit方法是AbstractService实现类中服务初始化逻辑的方法。
public void init(Configuration conf)
if (conf == null)
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
if (isInState(STATE.INITED))
return;
synchronized (stateChangeLock)
//状态转换NOINITED ->
INITED
if (enterState(STATE.INITED) != STATE.INITED)
setConfig(conf);
try
//AbstractService实现类中服务初始化逻辑
serviceInit(config);
if (isInState(STATE.INITED))
//if the service ended up here during init,
//notify the listeners
notifyListeners();
catch (Exception e)
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
抽象类AbstractService提供空的serviceInit的实现,如果AbstractService实现类没有重写serviceInit方法,默认会调用AbstractService#serviceInit方法。
protected void serviceInit(Configuration conf) throws Exception
if (conf != config)
LOG.debug("Config has been overridden during init");
setConfig(conf);
4.1.2 AbstractService#start设计方式start方法中实现了enterState(STATE.STARTED),使服务从INITED -> STARTED进行状态转换。start方法内部调用serviceStart方法,serviceStart方法是AbstractService实现类中服务启动的逻辑的方法。
public void start()
if (isInState(STATE.STARTED))
return;
//enter the started state
synchronized (stateChangeLock)
//状态转换INITED ->
STARTED
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED)
try
startTime = System.currentTimeMillis();
//AbstractService实现类中服务启动逻辑
serviceStart();
if (isInState(STATE.STARTED))
//if the service started (and isnt now in a later state), notify
LOG.debug("Serviceis started", getName());
notifyListeners();
catch (Exception e)
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
抽象类AbstractService提供空的serviceStart的实现,如果AbstractService实现类没有重写serviceStart方法,默认会调用AbstractService#serviceStart方法。
protected void serviceStart() throws Exception
4.1.3 AbstractService#stop设计方式stop方法中实现了enterState(STATE.STOPPED),使服务从STARTED-> STOPPED进行状态转换。stop方法内部调用serviceStop方法,serviceStop方法是AbstractService实现类中服务停止的逻辑的方法。
public void stop()
if (isInState(STATE.STOPPED))
return;
synchronized (stateChangeLock)
//状态转换STARTED ->
STOPPED
if (enterState(STATE.STOPPED) != STATE.STOPPED)
try
//AbstractService实现类中服务停止逻辑
serviceStop();
catch (Exception e)
//stop-time exceptions are logged if they are the first one,
noteFailure(e);
throw ServiceStateException.convert(e);
finally
//report that the service has terminated
terminationNotification.set(true);
synchronized (terminationNotification)
terminationNotification.notifyAll();
//notify anything listening for events
notifyListeners();
else
//already stopped: note it
LOG.debug("Ignoring re-entrant call to stop()");
抽象类AbstractService提供空的serviceStop的实现,如果AbstractService实现类没有重写serviceStop方法,默认会调用AbstractService#serviceStop方法。
protected void serviceStop() throws Exception
4.2 AbstractService状态转换
在4.1节中提到AbstractService状态转换。init、start、stop方法中,都是通过AbstractService#enterState方法进行状态转换,本节只通过init状态转换过程为例子,研究Service状态转换过程。
4.2.1 AbstractService状态转换分析AbstractService#enterState(STATE.INITED)进入STATE.INITED状态:
//传入的STATE.INITED状态是newState,即将要转换的新状态
private STATE enterState(STATE newState)
assert stateModel != null : "null state in " + name + " " + this.getClass();
//调用stateModel成员变量的enterState方法进行状态转换
STATE oldState = stateModel.enterState(newState);
if (oldState != newState)
LOG.debug("Service:entered state ", getName(), getServiceState());
recordLifecycleEvent();
return oldState;
stateModel是AbstractService类的成员变量,它的类型是ServiceStateModel:
public abstract class AbstractService implements Service
private final ServiceStateModel stateModel;
一个服务对象包含一个ServiceStateModel成员变量,每个服务的状态由ServiceStateModel中的state成员表示。state状态流向图如下图所示:
文章图片
为了防止用户随意转换服务的状态,ServiceStateModel定义了一个二维数组statemap,statemap的false表示禁止的状态转换方式;statemap的true表示允许的状态转换方式。ServiceStateModel定义如下:
文章图片
横轴代表旧状态,纵轴代表新状态。他们对应的值表示能否从旧状态转化成为新状态。例如:statemap[0][1]为true,表示可以从uninited状态转化成为inited;statemap[3][1]为true,表示不能从stopped状态转化成为inited。
4.2.2 ServiceStateModel状态转换分析ServiceStateModel#enterState先检查服务能否从旧状态转换成新状态,再更新服务的状态:
public synchronized Service.STATE enterState(Service.STATE proposed)
checkStateTransition(name, state, proposed);
Service.STATE oldState = state;
//atomic write of the new state
state = proposed;
return oldState;
ServiceStateModel#checkStateTransition将自身的state成员变量作为就的状态。检查服务能否从旧状态转换成新状态,如果不能转换,直接抛异常:
public static void checkStateTransition(String name,
Service.STATE state,
Service.STATE proposed)
if (!isValidStateTransition(state, proposed))
throw new ServiceStateException(name + " cannot enter state "
+ proposed + " from state " + state);
ServiceStateModel#isValidStateTransition通过statemap中二维数组的值判断能否从旧状态转换成新状态。如果数组的值是true,表示可以进行转换;如果数组的值是false,表示不可以进行转换:
public static boolean isValidStateTransition(Service.STATE current,
Service.STATE proposed)
boolean[] row = statemap[current.getValue()];
return row[proposed.getValue()];
4.2.3状态转换总结调用AbstractService#enterState(目的状态)即可从旧状态转换为目的状态。其中,ServiceStateModel的二维数组成员变量statemap用于判断转换是否符合规范。
5. 群组服务:CompositeService
AbstractService类只能表示一个独立的Service,并不能表示所有服务。例如ResourceManager本身就是一个服务,但是它包含一系列子服务,例如管理员服务、系统指标发布服务等等。初始化ResourceManager时,需要初始化ResourceManager的所有子服务。启动和停止ResourceManager服务同理。Yarn提供了CompositeService类,用于群组服务场景。
5.1 群组服务定义CompositeService继承了AbstractService类,意味着CompositeService服务本身会调用AbstractService#init、AbstractService#start、AbstractService#stop进行服务初始化/服务启动/服务停止。CompositeService的子服务由其成员变量serviceList表示。如下所示:
public class CompositeService extends AbstractService
private final List<
Service>
serviceList = new ArrayList<
Service>
();
5.2 群组服务初始化流程由于群组服务本身就是一个服务,初始化群组服务时,会先调用AbstractService#init方法。AbstractService#init方法中,在转换了服务状态后,调用其实现类#serviceInit。对于群组服务CompositeService,会调用CompositeService#serviceInit方法。在CompositeService#serviceInit方法中,会调用每个子服务的init方法。如果子服务也继承于AbstractService,那么这些子服务初始化时,同样会调用AbstractService#init方法,进行状态转换并调用子服务实现的serviceInit执行初始化逻辑。如果子服务还包含子服务,会递归地执行初始化流程...
protected void serviceInit(Configuration conf) throws Exception
//获取serviceList成员变量,即所有子服务
List<
Service>
services = getServices();
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": initing services, size=" + services.size());
for (Service service : services)
service.init(conf);
//通用一个conf配置文件//CompositeService的super是AbstractService,AbstractService#serviceInit为空代码块,没什么用
super.serviceInit(conf);
5.3 群组服务启动流程与初始化流程相似,在启动群组服务时,会先调用AbstractService#start方法。AbstractService#start方法中,在转换了服务状态后,调用其实现类#serviceStart。对于群组服务CompositeService,会调用CompositeService#serviceStart方法。在CompositeService#serviceStart方法中,会调用每个子服务的start方法。如果子服务也继承于AbstractService,那么这些子服务启动时,同样会调用AbstractService#start方法,进行状态转换并调用子服务实现的serviceStart执行初始化逻辑。如果子服务还包含子服务,会递归地执行启动流程...
protected void serviceStart() throws Exception
List<
Service>
services = getServices();
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting services, size=" + services.size());
for (Service service : services)
// start the service. If this fails that service
// will be stopped and an exception raised
service.start();
//CompositeService的super是AbstractService,AbstractService#serviceStart为空代码块,没什么用
super.serviceStart();
5.4 群组服务停止流程与启动流程相似,在停止群组服务时,会先调用AbstractService#stop方法。AbstractService#stop方法中,在转换了服务状态后,调用其实现类#serviceStop。对于群组服务CompositeService,会调用CompositeService#serviceStop方法。在CompositeService#serviceStop方法中,会调用每个子服务的stop方法。如果子服务也继承于AbstractService,那么这些子服务停止时,同样会调用AbstractService#stop方法,进行状态转换并调用子服务实现的serviceStop执行初始化逻辑。如果子服务还包含子服务,会递归地执行停止流程...
protected void serviceStop() throws Exception
//stop all services that were started
int numOfServicesToStop = serviceList.size();
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop);
stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES);
//CompositeService的super是AbstractService,AbstractService#serviceStart为空代码块,没什么用
super.serviceStop();
//可以控制停止的服务数量,并且能够控制只停止已经启动的服务
private void stop(int numOfServicesStarted, boolean stopOnlyStartedServices)
// stop in reverse order of start
Exception firstException = null;
List<
Service>
services = getServices();
//停止所有子Service
for (int i = numOfServicesStarted - 1;
i >
= 0;
i--)
Service service = services.get(i);
if (LOG.isDebugEnabled())
LOG.debug("Stopping service #" + i + ": " + service);
STATE state = service.getServiceState();
//depending on the stop police
//最终调用service.stop()停止service
if (state == STATE.STARTED || (!stopOnlyStartedServices &
&
state == STATE.INITED))
Exception ex = ServiceOperations.stopQuietly(LOG, service);
if (ex != null &
&
firstException == null)
firstException = ex;
//after stopping all services, rethrow the first exception raised
if (firstException != null)
throw ServiceStateException.convert(firstException);
5.5 群组服务总结对于包含子服务的服务来说,使用CompositeService初始化/启动/停止服务时,会初始化/启动/停止其所有子服务。
总结【Yarn Service框架分析】Yarn通过Service-> AbstractService-> CompositeService类的完善,实现了从简单到复杂的服务设计。实现了所有场景下服务的初始化/启动/停止流程。
推荐阅读
- 大话速学python做个飞机大战
- Linux的bash shell与man查看手册
- Nginx的TCP反向代理设置
- DNS服务
- Linux重置root密码
- Keepalived 高可用群集
- C++标准模板库STL最全总结收藏方便使用
- (服务运维)负载均衡LVS实战(NAT和DR模式)
- Samba在Centos6.5中的安装和应用