Spark|Spark Core源码精读计划#8(SparkEnv中RPC环境的基础构建)

目录

  • 前言
  • RPC端点及其引用
    • RpcEndpoint
    • RpcEndpoint继承体系
    • RpcEndpointRef
  • NettyRpcEnv概况
    • 创建NettyRpcEnv
    • NettyRpcEnv中的属性成员
  • 总结
前言
在之前的文章中,我们由SparkContext的初始化提到了事件总线LiveListenerBus与执行环境SparkEnv。在讲解SparkEnv的过程中,RPC环境RpcEnv又是首先被初始化的重要组件。做个不怎么恰当的比较,SparkEnv之于SparkContext,正如RpcEnv之于SparkEnv。
由于RPC环境负责着Spark体系内几乎所有内部及外部通信,内容很多,所以一篇文章必然讲不完。本文还是从基础开始看起。
RPC端点及其引用
RpcEnv抽象类是Spark RPC环境的通用表示,它其中定义的setupEndpoint()方法用来向RPC环境注册一个RPC端点(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果客户端要对一个RpcEndpoint发送消息,那么必须首先获得其对应的RpcEndpointRef。它们之间的关系可以用如下简图表示。

Spark|Spark Core源码精读计划#8(SparkEnv中RPC环境的基础构建)
文章图片
图#8.1 - RPC环境与RPC端点 既然RpcEndpoint和RpcEndpointRef是RPC环境中的基础组件,我们先来研究它们的源码。
RpcEndpoint RpcEndpoint是一个特征,其代码如下。
【Spark|Spark Core源码精读计划#8(SparkEnv中RPC环境的基础构建)】代码#8.1 - o.a.s.rpc.RpcEndpoint特征
private[spark] trait RpcEndpoint { val rpcEnv: RpcEnvfinal def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) }def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException(self + " does not implement 'receive'") }def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(new SparkException(self + " won't reply anything")) }def onError(cause: Throwable): Unit = { throw cause }def onConnected(remoteAddress: RpcAddress): Unit = { }def onDisconnected(remoteAddress: RpcAddress): Unit = { }def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { }def onStart(): Unit = { }def onStop(): Unit = { }final def stop(): Unit = { val _self = self if (_self != null) { rpcEnv.stop(_self) } } }

其中定义了如下方法,这些相当于是RPC端点在RPC环境中的“行为准则”。
  • self():取得当前RpcEndpoint对应的RpcEndpointRef。
  • receive()/receiveAndReply():接收其他RpcEndpointRef传来的消息并进行处理,receiveAndReply()方法还会发送回复。
  • onError():消息处理出现异常时调用的方法。
  • onConnected()/onDisconnected():当前RPC端点建立连接或断开连接时调用的方法。
  • onNetworkError():RPC端点的连接出现网络错误时调用的方法。
  • onStart()/onStop():RPC端点初始化与关闭时调用的方法。
  • stop():停止当前RpcEndpoint。
RpcEndpoint继承体系 RpcEndpoint的主要继承体系如下图所示。
Spark|Spark Core源码精读计划#8(SparkEnv中RPC环境的基础构建)
文章图片
#图8.2 - RpcEndpoint的主要继承体系 图中可以看到不少之前出现过的RPC端点,如文章#2中的HeartbeatReceiver,文章#7中的MapOutputTrackerMasterEndpoint、BlockManagerMasterEndpoint等。在今后涉及到它们时,会专门进行讲解。
另外,图中的ThreadSafeRpcEndpoint是直接继承自RpcEndpoint的特征。顾名思义,它要求RPC端点对消息的处理必须是线程安全的,用文档中的话说,线程安全RPC端点处理消息必须满足happens-before原则。
RpcEndpointRef RpcEndpointRef是一个抽象类,其代码如下。
代码#8.2 - o.a.s.rpc.RpcEndpointRef抽象类
private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging { private[this] val maxRetries = RpcUtils.numRetries(conf) private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)def address: RpcAddressdef name: Stringdef send(message: Any): Unitdef ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = { val future = ask[T](message, timeout) timeout.awaitResult(future) } }

这个抽象类的开头有三个属性,都是通过RpcUtils工具类从Spark配置项中取出来的,如下。
  • maxRetries:最大重连次数,对应配置项为spark.rpc.numRetries,默认值3次。
  • retryWaitMs:每次重连之前等待的时长,对应配置项为spark.rpc.retry.wait,默认值3秒。
  • defaultAskTimeout:对RPC端点进行ask()操作(下面会讲到)的默认超时时长,对应配置项为spark.rpc.askTimeout与spark.network.timeout(前者优先级高于后者),默认值120秒。
值得注意的是,maxRetries与retryWaitMs两个属性在当前的2.3.3版本中都没有用到,而在之前的版本中还是有用到的,证明Spark官方取消了RPC重试机制,也就是统一为消息传递语义中的at most once语义了。当然,我们也可以自己实现带有重试机制的RPC端点引用。
address和name方法分别返回RPC端点引用对应的地址和名称,不必多讲。下面几个方法的含义如下。
  • send()方法:异步发送一条单向的消息,并且“发送即忘记”(fire-and-forget),不需要回复。
  • ask()方法:异步发送一条消息,并在规定的超时时间内等待RPC端点的回复。RPC端点会调用receiveAndReply()方法来处理。
  • askSync()方法:是ask()方法的同步实现。由于它是阻塞操作,有可能会消耗大量时间,因此必须慎用。
RpcEndpointRef只有一个子类,即NettyRpcEndpointRef。它对send()和ask()两个方法的实现如下。
代码#8.3 - o.a.s.rpc.netty.NettyRpcEndPointRef.send()与ask()方法
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout) }override def send(message: Any): Unit = { require(message != null, "Message is null") nettyEnv.send(new RequestMessage(nettyEnv.address, this, message)) }

可见是依赖于NettyRpcEnv类的,下面来看一下它是如何创建出来的。
NettyRpcEnv概况
创建NettyRpcEnv 在文章#7的代码#7.4~#7.5中,通过工厂类NettyRpcEnvFactory的create()方法创建出了NettyRpcEnv,它是目前Spark官方提供的RPC环境的唯一实现。该方法的代码如下。
代码#8.4 - o.a.s.rpc.netty.NettyRpcEnvFactory.create()方法
def create(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager, config.numUsableCores) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv }

可见,这个方法先创建了JavaSerializer序列化器,用于RPC传输中的序列化。然后通过NettyRpcEnv的构造方法创建NettyRpcEnv,这其中也会涉及到一些RPC基础组件的初始化,后面会讲解到。最后定义偏函数startNettyRpcEnv,并调用通用工具类Utils中的startServiceOnPort()方法来启动NettyRpcEnv。
NettyRpcEnv中的属性成员 我们暂时先不看NettyRpcEnv类的细节,而是先来看看它内部包含了哪些组件。
代码#8.5 - o.a.s.rpc.netty.NettyRpcEnv中的属性成员
private[netty] val transportConf = SparkTransportConf.fromSparkConf( conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"), "rpc", conf.getInt("spark.rpc.io.threads", 0))private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)private val streamManager = new NettyStreamManager(this)private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager))private val clientFactory = transportContext.createClientFactory(createClientBootstraps())@volatile private var fileDownloadFactory: TransportClientFactory = _val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( "netty-rpc-connection", conf.getInt("spark.rpc.connect.threads", 64))@volatile private var server: TransportServer = _private val stopped = new AtomicBoolean(false)private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

  • TransportConf:传输配置,作用在RPC环境中类似于SparkConf,负责管理与RPC相关的各种配置。
  • Dispatcher:调度器,或者叫分发器,用于将消息路由到正确的RPC端点。
  • NettyStreamManager:流式管理器,用于处理RPC环境中的文件,如自定义的配置文件或者JAR包。
  • TransportContext:传输上下文,作用在RPC环境中类似于SparkContext,负责管理RPC的服务端(TransportServer)与客户端(TransportClient),与它们之间的Netty传输管道。
  • TransportClientFactory:创建RPC客户端TransportClient的工厂。
  • TransportServer:RPC环境中的服务端,负责提供基础且高效的流式服务。
TransportConf和TransportContext提供底层的基于Netty的RPC机制,TransportClient和TransportServer则是RPC端点的最低级别抽象。
总结
本文讲解了RPC环境的基本组成部分RpcEndpoint、RpcEndpointRef的细节实现,并初步了解了NettyRpcEnv的创建过程,以及它内部包含的主要组件。虽然TransportConf和TransportContext更为基础,但为了避免嵌套太深出不来,下一篇文章暂时不准备讲它们,而主要来研究NettyRpcEnv内的调度器Dispatcher,它是整个RPC环境高效运转的基础。

    推荐阅读