Yarn Active ResourceManager启动框架分析

听闻少年二字,当与平庸相斥。这篇文章主要讲述Yarn Active ResourceManager启动框架分析相关的知识,希望能为你提供帮助。
1. 前言通过第一篇文章Yarn Service设计思想知道,YARN为了处理繁多的功能,将每个功能封装成一个Service,通过实现CompositeService统一管理子服务的初始化/启动/停止流程。ResourceManager作为CompositeService的实现类,也遵循这套流程。更重要的是,在启动ResourceManager时,还会通过Zookeeper选举出Active ResourceManager。本篇文章以ResourceManager服务启动为基础,深入探究Active ResourceManager选举框架。
2. ResourceManager启动哪些服务对于ResourceManager提供的服务,分为两种:常驻服务和" 活动" 服务active service:

  • 常驻服务在两台ResourceManager上都提供。具体提供的服务如下:
Yarn Active ResourceManager启动框架分析

文章图片

  • Active Service只能在一台Active ResourceManager上提供。具体提供的服务如下:
Yarn Active ResourceManager启动框架分析

文章图片

对于ActiveServices的初始化/启动/管理,这里先写结论,具体的分析在后面:
Yarn Active ResourceManager启动框架分析

文章图片

3. 为什么不将ActiveServices服务作为ResourceManager子服务由于ActiveServices只能在一台ResourceManager上启动,因此ActiveServices的start启动逻辑与ResourceManager的启动逻辑不同:ResourceManager无差别启动所有子服务,此时由于还不确定哪一台ResourceManager作为Active ResourceManager,因此此时如果ActiveServices加入ResourceManager子服务,那么两台ResourceManager上都会启动ActiveServices,这显然不符合ActiveServices服务的唯一性。
4. 何时启动ActiveServices服务在两台ResourceManager中,为了选举出Active ResourceManager,从而提供唯一的ActiveServices服务。ResourceManager提供了常驻子服务EmbeddedElector。EmbeddedElector内部连接zookeeper,当启动EmbeddedElector时,互斥地争抢当前ResourceManager对应的锁,抢到锁后,当前ResourceManager状态切换成Active ResourceManager,并启动ActiveServices服务;否则当前ResourceManager状态切换成Standby ResourceManager,只维护常驻服务。同时,EmbeddedElector服务还向zookeeper注册watcher,一旦Active ResourceManager状态发生变化,watcher的回调函数会立即切换ResourceManager状态。
5. ResourceManager子服务初始化& 启动流程在针对Active ResourceManager启动方面,其重要的子服务初始化/启动流程如下:
  • EmbeddedElector选举服务初始化
  • ActiveServices初始化
  • EmbeddedElector选举服务启动
  • ActiveServices启动
对应的代码如下:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean public static void main(String argv[]) //省略 ResourceManager resourceManager = new ResourceManager(); // 这里就是调用AbstractService.init,然后调用ResourceManager.serviceInit resourceManager.init(conf); // 和上面类似,调用ResourceManager.serviceStart resourceManager.start(); //省略

5.1 ResourceManager初始化流程
ResourceManager#init初始化方法是继承自AbstractService#init方法,最终调用ResourceManager#serviceInit:
public abstract class AbstractService implements Service public void init(Configuration conf) if (conf == null) throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); if (isInState(STATE.INITED)) return; synchronized (stateChangeLock) if (enterState(STATE.INITED) != STATE.INITED) setConfig(conf); try serviceInit(config); if (isInState(STATE.INITED)) //if the service ended up here during init, //notify the listeners notifyListeners(); catch (Exception e) noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e);

ResourceManager#serviceInit负责创建EmbeddedElector服务作为子服务,并调用ResourceManager#createAndInitActiveServices方法创建并初始化独立的ActiveServices服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean protected void serviceInit(Configuration conf) throws Exception //省略 //注册常驻服务,例如AdminService adminService = createAdminService(); addService(adminService); rmContext.setRMAdminService(adminService); //省略 if (this.rmContext.isHAEnabled()) // If the RM is configured to use an embedded leader elector, // initialize the leader elector. if (HAUtil.isAutomaticFailoverEnabled(conf) & & HAUtil.isAutomaticFailoverEmbedded(conf)) EmbeddedElector elector = createEmbeddedElector(); addIfService(elector); rmContext.setLeaderElectorService(elector); //省略 createAndInitActiveServices(false); //省略

5.2 创建选举服务EmbeddedElector
ResourceManager根据yarn.resourcemanager.ha.curator-leader-elector.enabled配置确定EmbeddedElector的具体实现类。如果为true,就确定具体实现类为CuratorBasedElectorService,该实现类基于curator框架,curator框架是zk客户端框架,它在zookeeper原生API接口上进行了包装。默认的实现类为ActiveStandbyElectorBasedElectorService,它基于原生zookeeper API接口:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean protected EmbeddedElector createEmbeddedElector() throws IOException EmbeddedElector elector; curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) this.zkManager = createAndStartZKManager(conf); elector = new CuratorBasedElectorService(this); else elector = new ActiveStandbyElectorBasedElectorService(this); return elector;

5.3 ActiveStandbyElectorBasedElectorService选举服务初始化
本文章基于ActiveStandbyElectorBasedElectorService选举服务讨论Active ResourceManager选举流程。ActiveStandbyElectorBasedElectorService的初始化方法中,定义了zookeeper路径/yarn-leader-election/ActiveStandbyElectorLock,每台ResourceManager的ElectorService都会尝试在zookeeper中创建该临时路径。一旦路径创建成功,该ResourceManager最终会被选举成为Active ResourceManager。
最重要的是,ActiveStandbyElectorBasedElectorService初始化时,创建成员变量ActiveStandbyElector实例:
public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback protected void serviceInit(Configuration conf) throws Exception //省略 String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS); //省略 String rmId = HAUtil.getRMHAId(conf); String clusterId = YarnConfiguration.getClusterId(conf); localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId); String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); List< ACL> zkAcls = ZKCuratorManager.getZKAcls(conf); List< ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf); //省略 //创建选举对象 elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, electionZNode, zkAcls, zkAuths, this, maxRetryNum, false); elector.ensureParentZNode(); //省略 super.serviceInit(conf);

ActiveStandbyElector负责连接Zookeeper服务端,维持watcher,监听互斥锁/yarn-leader-election/ActiveStandbyElectorLock的状态,根据其状态切换ResourceManager的状态。
5.3.1 zookeeper连接在ActiveStandbyElector初始化时,会创建与zookeeper的连接:
public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List< ACL> acl, List< ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, int maxRetryNum, boolean failFast) throws IOException, HadoopIllegalArgumentException, KeeperException if (app == null || acl == null || parentZnodeName == null || zookeeperHostPorts == null || zookeeperSessionTimeout < = 0) throw new HadoopIllegalArgumentException("Invalid argument"); zkHostPort = zookeeperHostPorts; zkSessionTimeout = zookeeperSessionTimeout; zkAcl = acl; zkAuthInfo = authInfo; appClient = app; znodeWorkingDir = parentZnodeName; zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; this.maxRetryNum = maxRetryNum; // establish the ZK Connection for future API calls if (failFast) createConnection(); else reEstablishSession();

reEstablishSession在createConnection方法外,包装了一层错误重试。这里直接看ActiveStandbyElector#createConnection方法:
private void createConnection() throws IOException, KeeperException //省略 zkClient = connectToZooKeeper(); //省略

ActiveStandbyElector#connectToZooKeeper负责创建Watcher对象,对zookeeper进行监听:
protected synchronized ZooKeeper connectToZooKeeper() throws IOException, KeeperException watcher = new WatcherWithClientRef(); //把watcher注册到zookeeper中 ZooKeeper zk = createZooKeeper(); watcher.setZooKeeperRef(zk); //省略 watcher.waitForZKConnectionEvent(zkSessionTimeout); //省略 return zk;

WatcherWithClientRef#process方法负责处理zk事件,真实处理事件的是ActiveStandbyElector#processWatchEvent方法:
private final class WatcherWithClientRef implements Watcher private ZooKeeper zk; //只有收到zk服务端的返回的连接事件后,才允许处理其它事件 private CountDownLatch hasReceivedEvent = new CountDownLatch(1); //只有等待watcher设置了zookeeper引用,才能处理事件 private CountDownLatch hasSetZooKeeper = new CountDownLatch(1); //省略普通方法//process是watcher处理zk事件的方法 @Override public void process(WatchedEvent event) //省略 ActiveStandbyElector.this.processWatchEvent(zk, event); //省略

5.3.2 zookeeper监听处理ActiveStandbyElector#processWatchEvent负责处理监听事件,zk状态和事件类型对应关系如下:
Yarn Active ResourceManager启动框架分析

文章图片

根据zk状态和事件类型的不同,对ResourceManager状态的调整策略也不同。具体处理逻辑如下所示:
public class ActiveStandbyElector implements StatCallback, StringCallback private ConnectionState zkConnectionState = ConnectionState.TERMINATED; synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) Event.EventType eventType = event.getType(); //处理连接状态下的事件 if (eventType == Event.EventType.None) // the connection state has changed switch (event.getState()) case SyncConnected: LOG.info("Session connected."); ConnectionState prevConnectionState = zkConnectionState; zkConnectionState = ConnectionState.CONNECTED; if (prevConnectionState == ConnectionState.DISCONNECTED & & wantToBeInElection) monitorActiveStatus(); break; case Disconnected: LOG.info("Session disconnected. Entering neutral mode..."); // ask the app to move to safe state because zookeeper connection // is not active and we dont know our state zkConnectionState = ConnectionState.DISCONNECTED; enterNeutralMode(); break; case Expired: // the connection got terminated because of session timeout // call listener to reconnect LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); reJoinElection(0); break; case SaslAuthenticated: LOG.info("Successfully authenticated to ZooKeeper using SASL."); break; default: fatalError("Unexpected Zookeeper watch event state: " + event.getState()); break; return; //监听节点发生修改 String path = event.getPath(); if (path != null) switch (eventType) case NodeDeleted: if (state == State.ACTIVE) enterNeutralMode(); joinElectionInternal(); break; case NodeDataChanged: monitorActiveStatus(); break; default: if (LOG.isDebugEnabled()) LOG.debug("Unexpected node event: " + eventType + " for path: " + path); monitorActiveStatus(); return; // some unexpected error has occurred fatalError("Unexpected watch error from Zookeeper");

对于ActiveStandbyElector处理zk事件的方法,无非就是ResourceManager进入active状态/standby状态/neutral状态。这里讨论一下它们的转换逻辑。
5.3.2.1 竞争active状态检查是否存在节点,不存在就进入standby状态,并重新注册watcher:
public class ActiveStandbyElector implements StatCallback, StringCallback private void monitorLockNodeAsync() monitorLockNodePending = true; monitorLockNodeClient = zkClient; zkClient.exists(zkLockFilePath, watcher, this, zkClient);

ActiveStandbyElector重写了exists回调函数,会根据分布式锁的获取情况转换ResourceManager的主备状态:
public class ActiveStandbyElector implements StatCallback, StringCallback public synchronized void processResult(int rc, String path, Object ctx, Stat stat) //如果当前ResourceManager获取到了zk分布式锁,就进入activce状态,否则就进入standby状态 if (stat.getEphemeralOwner() == zkClient.getSessionId()) //进入active状态 if (!becomeActive()) reJoinElectionAfterFailureToBecomeActive(); else //进入standby状态 becomeStandby(); //节点不存在就进入中立状态,并尝试创建zk分布式锁 if (isNodeDoesNotExist(code)) enterNeutralMode(); //尝试重新创建zk分布式锁 joinElectionInternal(); return;

5.3.2.2 中立状态处理如果断连,就进入NEUTRAL状态:
public class ActiveStandbyElector implements StatCallback, StringCallback private void enterNeutralMode() if (state != State.NEUTRAL) state = State.NEUTRAL; appClient.enterNeutralMode();

ActiveStandbyElector#enterNeutralMode调用appClient成员的enterNeutralMode方法。而appClient的实例类型其实就是ActiveStandbyElectorBasedElectorService,即调用ActiveStandbyElectorBasedElectorService#enterNeutralMode进入中立状态。中立状态下ResourceManager丢失与ZK的连接,尝试先进入standby状态:
public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback public void enterNeutralMode() //省略 zkDisconnectTimer = new Timer("Zookeeper disconnect timer"); zkDisconnectTimer.schedule(new TimerTask() @Override public void run() synchronized (zkDisconnectLock) becomeStandby(); , zkSessionTimeout); //省略

5.3.2.3 连接过期处理如果过期,就重新尝试进入Active状态:
public class ActiveStandbyElector implements StatCallback, StringCallback private void reJoinElection(int sleepTime) sessionReestablishLockForTests.lock(); try terminateConnection(); sleepFor(sleepTime); if (appData != null) joinElectionInternal(); finally sessionReestablishLockForTests.unlock();

对于初次链接zookeeper场景。初始状态是ConnectionState.TERMINATED,当客户端与zookeeper服务端成功创建会话时,客户端收到zookeeper服务端返回的状态是SyncConnected,其对应的事件类型是Event.EventType.None。按照zookeeper事件处理方法processWatchEvent,此时直接break跳出switch分支。这表示,当客户端成功与服务端建立连接,客户端不需要进行任何处理。
5.4 Active Service初始化
在ResourceManager初始化时,会额外调用方法初始化ActiveServices。Active Service不属于ResourceManager的子服务,即ResourceManager的初始化/启动/停止流程与Active Service无关:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean protected void serviceInit(Configuration conf) throws Exception //省略 createAndInitActiveServices(false); //省略

ResourceManager#createAndInitActiveServices调用activeServices的初始化逻辑:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean protected void createAndInitActiveServices(boolean fromActive) activeServices = new RMActiveServices(this); activeServices.fromActive = fromActive; activeServices.init(conf);

activeServices具体类型为RMActiveServices,其初始化过程就是创建子服务,并添加子服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean public class RMActiveServices extends CompositeService protected void serviceInit(Configuration configuration) throws Exception standByTransitionRunnable = new StandByTransitionRunnable(); //忽略 xxxService = createXxxService(); addService(xxxService); //忽略 super.serviceInit(conf);

5.5 选举服务启动
创建ActiveStandbyElectorBasedElectorService对象后,所有Resourcemanager都创建了zkClient,与zkServer创建连接。启动EmbeddedElector的调用流如下:
ActiveStandbyElectorBasedElectorService#serviceStart -> ActiveStandbyElector#joinElection -> ActiveStandbyElector#joinElectionInternal -> ActiveStandbyElector#createLockNodeAsync。ActiveStandbyElector#createLockNodeAsync负责获取获取active ResourceManager的锁:
public class ActiveStandbyElector implements StatCallback, StringCallback private void createLockNodeAsync() zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);

createLockNodeAsync调用Zookeeper#create尝试获取分布式锁,以进入Active状态:
public class ZooKeeper public void create(final String path, byte data[], List< ACL> acl, CreateMode createMode,StringCallback cb, Object ctx)//省略

StringCallback是异步回调,表示当客户端向服务端发送创建节点的请求时,服务端异步返回响应消息给客户端后,客户端通过StringCallback#processResult处理该响应。
对于上述create方法,ActiveStandbyElector实现了Zookeeper提供的回调接口。当create方法执行完,异步执行ActiveStandbyElector#processResult方法:
public class ActiveStandbyElector implements StatCallback, StringCallback public synchronized void processResult(int rc, String path, Object ctx, String name) //省略 if (isSuccess(code)) // we successfully created the znode. we are the leader. start monitoring //尝试进入Active状态 if (becomeActive()) //验证 monitorActiveStatus(); else //否则重新尝试创建zookeeper节点,以获得Active状态 reJoinElectionAfterFailureToBecomeActive(); return; //如果创建节点失败,但是节点已经存在,就进入standby状态 if (isNodeExists(code)) if (createRetryCount == 0) becomeStandby(); monitorActiveStatus(); return; //如果创建节点失败,节点尚未存在,就重试 if (shouldRetry(code)) if (createRetryCount < maxRetryNum) ++createRetryCount; createLockNodeAsync(); return; //省略

5.6 Active Service 启动
正常情况下,调用ActiveStandbyElector#becomeActive方法使ResourceManager进入active状态:
public class ActiveStandbyElector implements StatCallback, StringCallback private boolean becomeActive() //省略 appClient.becomeActive(); //省略

appClient正是初始化ActiveStandbyElector对象时传入的ActiveStandbyElectorBasedElectorService实例:
public class ActiveStandbyElectorBasedElectorService extends AbstractService implements EmbeddedElector, ActiveStandbyElector.ActiveStandbyElectorCallback public void becomeActive() throws ServiceFailedException cancelDisconnectTimer(); try rm.getRMContext().getRMAdminService().transitionToActive(req); catch (Exception e) throw new ServiceFailedException("RM could not transition to Active", e);

调用AdminService#transitionToActive使当前ResourceManager进入Active状态:
public class AdminService extends CompositeService implements HAServiceProtocol, ResourceManagerAdministrationProtocol public synchronized void transitionToActive( HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException //省略 rm.transitionToActive(); //省略

AdminService内部调用ResourceManager#startActiveServices方法使ResourceManager进入active状态:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean synchronized void transitionToActive() throws Exception if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) LOG.info("Already in active state"); return; LOG.info("Transitioning to active state"); this.rmLoginUGI.doAs(new PrivilegedExceptionAction< Void> () @Override public Void run() throws Exception try startActiveServices(); return null; catch (Exception e) reinitialize(true); throw e; ); rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE); LOG.info("Transitioned to active state");

ResourceManager#startActiveServices真正启动active services服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean void startActiveServices() throws Exception if (activeServices != null) clusterTimeStamp = System.currentTimeMillis(); activeServices.start();

5.7 切换standby状态
当选举失败时,ResourceManager会进入standby状态;如果此时ResourceManager已经处于active状态,会停止RMActiveServices服务:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean synchronized void transitionToStandby(boolean initialize) throws Exception if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY) LOG.info("Already in standby state"); return; LOG.info("Transitioning to standby state"); HAServiceState state = rmContext.getHAServiceState(); rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); if (state == HAServiceProtocol.HAServiceState.ACTIVE) stopActiveServices(); reinitialize(initialize); LOG.info("Transitioned to standby state");

6. 总结【Yarn Active ResourceManager启动框架分析】RMActiveServices不属于ResourceManager的子服务,初始化/启动/停止流程都独立于ResourceManager子服务流程:
  • 启动流程由ActiveStandbyElectorBasedElectorService选举服务负责。
  • 状态切换流程由Zookeeper监听器服务实现。
  • 初始化/停止流程由ResourceManager的额外的方法调用实现。

    推荐阅读