Spark调度系统--Job Scheduling
1.spark调度系统
spark的调度框架分为双层。第一层是对application进行资源调度,也就是多个application在集群中运行时怎么给每个application分配资源。这块源码对应之前的spark源码(四)–master资源调度。主要对应的是application包括driver、executor的调度。第二层调度是指每个spark application都会对应若干个job,这些job是怎么在对应的executor上去执行的,这也是一个调度系统, 即application内部的调度,也可以理解为一个sparkcontext实例的内部调度。这块源码对应spark源码(五六)。针对一个application中的不同job,如果他们是由不同的线程提交的,那么他们可以运行,spark提供了针对一个application的公平调度策略
2.跨应用调度
当我们在集群上运行程序的时候,每个spark application都会获取一组独立的executor JVMs,这些JVM仅仅用来运行app的tasks和缓存app的数据。当有多个app在集群中运行的时候,就牵扯到该如何给这些app分配资源。不同的集群管理器又有不同的做法
最简单的方式是提供静态资源分配,也就是给每个app固定资源去运行,并且资源数量在整个运行期间不会发生变化。这种方式使用于Standalone、YARN
Standalone mode: 默认情况下,Standalone 模式提交到集群的app按照FIFO顺序执行,并且每个app会占用所有可用的节点。我们可以通过设置 spark.cores.max 来限制app使用的节点数。也可以通过设置 spark.deploy.defaultCores 来更改此app没有设定的默认值。最后,除了控制cores之外,每个应用程序还可以使用spark.executor.memory配置控制其内存使用。
YARN : yarn客户端的–num-executors 可以控制集群上分配多少个executor
–executor-memory 控制每个executor占用多少内存, --executor-cores 确定每个executor分配多少个cpu。这几个参数就确定了这个app在集群上占用的所有资源。
3.spark application内部的调度
在一个spark application中(一个sparkcontext实例),如果从单独的线程提交多个并行的job,那么多个job可以同时运行。这里的job指的是一个Spark action(例如,save,collect)以及任何需要运行的任务以评估该动作。Spark的调度程序是完全线程安全的,并支持这种用例来启用提供多个请求的应用程序(例如,多个用户的查询)。
默认情况下,spark以FIFO的形式调度job。每个job都被划分成很多stage(例如,mapStage和resultStage)。在第一个job运行会占用所有的可用资源,等第一个job结束以后才会运行第二个job。如果队列头部的job不需要整个集群的资源,后面的job也可以同时运行
在spark0.8以后,也可以通过公平调度的方式去调度job。在公平调度模式下,spark通过轮询的方式为job分配task,所以每个job可以公平的分享集群的资源。这意味着,在长jo运行期间提交的短job,也可以立即获取到资源运行而不用等待长job运行结束。这种模式适合多用户。
使用公平模式,只需要简单的设置 spark.scheduler.mode 就可以了
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
【Spark调度系统--Job Scheduling】4.公平调度池
公平调度还支持将task分组到调度池中,并且为每个调度池设置不同的权重。对于优先级很高的job设置高的权重是很有用的,或者将每个用户的job分组在一起,并给予用户相等的份额,而不管他们有多少并发作业,而不是给予作业相等的份额。这种方法是在Hadoop Fair Scheduler之后建模的。
没有任何干预,新提交的作业进入默认池,但是可以通过向提交的线程中的SparkContext添加spark.scheduler.pool“local property”来设置作业的池。如下:
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
设置此本地属性后,在此线程中提交的所有作业(通过此线程中的调用到RDD.save,count,collect等)将使用此pool 名称。该设置是每个线程,使得线程可以代表同一用户运行多个作业变得容易。如果要清除线程与之关联的池,只需调用:
sc.setLocalProperty("spark.scheduler.pool", null)
5.池的默认行为
默认情况下,每个池平等的获取集群份额(默认池中每个job也是公平获取资源),在池的内部,job是按照FIFO方式运行。如果你给每个用户都创建一个池,那么每个用户也都会获取集群相等的份额,并且每个用户的查询将按顺序运行,而不是以后的查询从该用户的早期查询中获取资源。
6.配置池属性
还可以通过配置文件修改特定池的属性。每个池支持三个属性:
schedulingMode: 调度池调度模式,可以是FIFO或者FAIR,控制池中的task是按照顺序执行或者公平分享资源。默认FIFO
weight: 权重,控制调度池针对其他池的共享占比,默认是1,即所有的池获取相等的资源。如果你将某个池权重设置为2,那么他将获取其他池2倍的资源。设置诸如1000这类的搞权重也可以提高池的优先级,实际上,如果你将权重设置为1000,那么这个池中的task永远是第一个执行。
minShare: 出来整体的权重之外,每个池还可以设置最小资源(比如最少的cpu core)。在公平调度模式下,总是会优先满足调度池的最小资源,然后再按照权重去分配资源。因此,minShare属性可以是另一种确保池总是能够快速获得一定数量的资源(例如10个内核)的方法。默认情况下,每个池的minShare为0。
可以通过创建类似于conf / fairscheduler.xml.template的XML文件,并在SparkConf中设置spark.scheduler.allocation.file属性来设置池属性。
conf.set("spark.scheduler.allocation.file", "/path/to/file")
XML文件的格式只是每个池的一个元素,其中包含不同的元素,用于各种设置。 例如:
FAIR
1
2
FIFO
2
3
conf / fairscheduler.xml.template中还提供了一个完整示例。 请注意,没有在XML文件中配置的任何池将简单地获取所有设置(调度模式FIFO,权重1和minShare 0)的默认值。
参考spark官网:http://spark.apache.org/docs/latest/job-scheduling.html
推荐阅读
- 如何在Mac中的文件选择框中打开系统隐藏文件夹
- 单点登陆
- 操作系统|[译]从内部了解现代浏览器(1)
- 游乐园系统,助力游乐园管理
- 中国MES系统软件随工业化成长
- 推荐系统论文进阶|CTR预估 论文精读(十一)--Deep Interest Evolution Network(DIEN)
- Spark|Spark 数据倾斜及其解决方案
- Cesium|Cesium 粒子系统学习
- Android系统启动之init.rc文件解析过程
- 程序员|【高级Java架构师系统学习】毕业一年萌新的Java大厂面经,最新整理