Spark生态系统目前已经非常成熟了,有很多类型的任务都可以使用spark完成,我们先看下spark生态系统的组成:
[img]http://dl2.iteye.com/upload/attachment/0127/9048/acc4fb45-3278-3491-bf21-2be4bd608d2a.png[/img]
spark的核心主要由3个模块组成:
(1)spark core 是spark的最底层的编程实现,定义了许多的函数及方法,是所有spark组件的基础依赖
(2)spark ecosystems 是spark里面的一些高级组件,基本就是我们最常用的框架
(3)resource management 负责spark任务的调度
平时我们开发过程中,基本上使用的都是第二层里面的一些框架,这里面使用最多的莫过于spark sql和spark streaming了。在对spark整个生态系统有一个基本了解后,下面我们就关注的是其运行机制了,只有解了运行机制,才会对我们使用程序,或者排查问题以及性能调优起到很大的帮助。
下面我们看下spark任务的运行机制如下图:
[img]http://dl2.iteye.com/upload/attachment/0127/9050/344b695b-4080-3c38-9c46-212a4d01f98c.png[/img]
Spark相关一些术语解释:
(一)Driver program
driver就是我们编写的spark应用程序,用来创建sparkcontext或者sparksession,driver会和cluster mananer通信,并分配task到executor上执行
(二)Cluster Manager
负责整个程序的资源调度,目前的主要调度器有:
YARN
Spark Standalone
Mesos
(三)Executors
Executors其实是一个独立的JVM进程,在每个工作节点上会起一个,主要用来执行task,一个executor内,可以同时并行的执行多个task。
(四)Job
Job是用户程序一个完整的处理流程,是逻辑的叫法。
(五)Stage
一个Job可以包含多个Stage,Stage之间是串行的,State的触发是由一些shuffle,reduceBy,save动作产生的
(六)Task
一个Stage可以包含多个task,比如sc.textFile("/xxxx").map().filter(),其中map和filter就分别是一个task。每个task的输出就是下一个task的输出。
(七)Partition
partition是spark里面数据源的一部分,一个完整的数据源会被spark切分成多个partition以方便spark可以发送到多个executor上去并行执行任务。
(八)RDD
RDD是分布式弹性数据集,在spark里面一个数据源就可以看成是一个大的RDD,RDD由多个partition组成,spark加载的数据就会被存在RDD里面,当然在RDD内部其实是切成多个partition了。
那么问题来了一个spark job是如何执行的?
(1)我们写好的spark程序,也称驱动程序,会向Cluster Manager提交一个job
(2)Cluster Manager会检查数据本地行并寻找一个最合适的节点来调度任务
(3)job会被拆分成不同stage,每个stage又会被拆分成多个task
(4)驱动程序发送task到executor上执行任务
(5)驱动程序会跟踪每个task的执行情况,并更新到master node节点上,这一点我们可以在spark master UI上进行查看
(6)job完成,所有节点的数据会被最终再次聚合到master节点上,包含了平均耗时,最大耗时,中位数等等指标。
最后关于spark的并行执行策略在总结下:
首先我们的数据源会被加载到RDD里面,在RDD里面整个数据源会被切分成多个partition,partition的个数实际就是我们执行任务的最大并行度,每个task会负责一个partition的数据,而每个spark任务最大可以执行task的个数=executor的个数 * 每个executor的cores的个数。对应到submit脚本中参数就是:
--num-executors
--executor-cores
根据spark官网的建议每个executor上建议分配置的core的个数应该在3到5之间,如果分配的太多会生成大量的小task执行,task的序列化和传输都会比较耗性能,如果分配的task数量太少,那么我们的executor大部分时候机器资源就会浪费,所以分配的个数一般在3到5个就行,这一点需要注意。
附录:
Spark中RDD,DataFrame,DataSet的区别:
1、RDD支持面向java、scala对象,编译时强类型检查。缺点,序列化非常消耗时间(集群分发和磁盘存储),不能充分利用系统的高级优化能力(如off-heap),垃圾回收对象开销大。
2、DataFrame支持数据集的模式表示(即数据列的概念),所以集群分发数据时并不需要序列化,能充分利用系统高级优化能力(off-heap),引入查询计划提高性能。缺点,DataFrame的列类型,在编译时无法判断类型,会产生运行时错误。
3、Dataset即支持数据集的模式表示,又支持java、scala对象的类型检查能力。两者通过引入一种编码、解码机制来实现。
参考链接:
[url]http://datastrophic.io/core-concepts-architecture-and-internals-of-apache-spark/[/url]
[url]https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781785885136/1/ch01lvl1sec11/spark-architecture[/url]
[url]http://backtobazics.com/big-data/spark/understanding-apache-spark-architecture/[/url]
[url]https://www.dezyre.com/article/how-data-partitioning-in-spark-helps-achieve-more-parallelism/297[/url]
[url]http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/[/url]
[b][color=green][size=large] 有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。 [/size][/color][/b] [img]http://dl2.iteye.com/upload/attachment/0104/9948/3214000f-5633-3c17-a3d7-83ebda9aebff.jpg[/img]
推荐阅读
- spark|spark UDAF根据某列去重求合 distinct sum
- 大数据|spark UDAF 自定义聚合函数 UserDefinedAggregateFunction 带条件的去重操作
- Spark 写入 MySQL 乱码问题
- SCD|一种基于SparkSQL的Hive数据仓库拉链表缓慢变化维(SCD2+SCD1)的示例实现
- spark算子--action篇
- 大数据|Spark调优解决方案(一)之提交任务时合理分配资源
- spark|spark2.1 新特性
- Spark Aggregate算子