中间件|Spark 详解

主要参考:添加链接描述
与Hadoop关系 为了跟 Hadoop 配合而开发出来的,不是为了取代 Hadoop, Spark 运算比 Hadoop 的 MapReduce 框架快的原因是因为 Hadoop 在一次 MapReduce 运算之后,会将数据的运算结果从内存写入到磁盘中,第二次 Mapredue 运算时在从磁盘中读取数据,所以其瓶颈在2次运算间的多余 IO 消耗. Spark 则是将数据一直缓存在内存中,直到计算得到最后的结果,再将结果写入到磁盘,所以多次运算的情况下, Spark 是比较快的.
中间件|Spark 详解
文章图片

主要特点 在核心框架 Spark 的基础上,主要提供四个范畴的计算框架:
  • Spark SQL: 提供了类 SQL 的查询,返回 Spark-DataFrame 的数据结构(类似 Hive)
  • Spark Streaming: 流式计算,主要用于处理线上实时时序数据(类似 storm)
  • MLlib: 提供机器学习的各种模型和调优
  • GraphX: 提供基于图的算法,如 PageRank
Spark 的主要特点还包括:
  • (1)提供 Cache 机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的 IO 开销;
  • (2)提供了一套支持 DAG 图的分布式并行计算的编程框架,减少多次计算之间中间结果写到 Hdfs 的开销;
  • (3)使用多线程池模型减少 Task 启动开稍, shuffle 过程中避免不必要的 sort 操作并减少磁盘 IO 操作。(Hadoop 的 Map 和 reduce 之间的 shuffle 需要 sort)
系统架构
首先明确相关术语
RDD,学名可伸缩的分布式数据集(Resilient Distributed Dataset)。是一种对数据集形态的抽象,基于此抽象,使用者可以在集群中执行一系列计算,而不用将中间结果落盘。而这正是之前 MR 抽象的一个重要痛点,每一个步骤都需要落盘,使得不必要的开销很高。
应用程序(Application): 基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor;
驱动(Driver): 运行Application的main()函数并且创建SparkContext;
执行单元(Executor): 是为某Application运行在Worker Node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的Executors;
集群管理程序(Cluster Manager): 在集群上获取资源的外部服务(例如:Local、Standalone、Mesos或Yarn等集群管理系统);
操作(Operation): 作用于RDD的各种操作分为Transformation和Action.
整个 Spark 集群中,分为 Master 节点与 worker 节点:
  1. 其中 Master 节点上常驻 Master 守护进程和 Driver 进程, Master 负责将串行任务变成可并行执行的任务集Tasks, 同时还负责出错问题处理等
  2. 而 Worker 节点上常驻 Worker 守护进程,
  3. Master 节点与 Worker 节点分工不同, Master 负载管理全部的 Worker 节点,而 Worker 节点负责执行任务.
运行原理
  1. 我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。
  2. 根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。而Driver进程要做的第一件事情,就是向集群管理器(字节用YRN)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。
  3. 在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批Task,然后将这些Task分配到各个Executor进程中执行。Task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个Task处理的数据不同而已。
  4. 一个stage的所有Task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的Task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。
  5. 当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个Task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
批注:
3.1:怎么拆分的stage:
shuffle
Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个Task可能都会从上一个stage的Task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
而调用一个Spark内部的 Action 会产生一个 Spark job 来完成它。 为了确定这些job实际的内容,Spark 检查 RDD 的DAG再计算出执行 plan 。这个 plan 以最远端的 RDD 为起点(最远端指的是对外没有依赖的 RDD 或者 数据已经缓存下来的 RDD),产生结果 RDD 的 Action 为结束 。并根据是否发生 shuffle 划分 DAG 的 stage.
5.1:Executor的内存主要分为三块:
第一块是让Task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;
第二块是让Task通过shuffle过程拉取了上一个stage的Task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;
第三块是让RDD持久化时使用,默认占Executor总内存的60%。
关键内容详解 RDD—弹性分布式数据集 RDD(Resilent Distributed Datasets)俗称弹性分布式数据集,是 Spark 底层的分布式存储的数据结构,可以说是 Spark 的核心, Spark API 的所有操作都是基于 RDD 的. 数据不只存储在一台机器上,而是分布在多台机器上,实现数据计算的并行化.弹性表明数据丢失时,可以进行重建.在Spark 1.5版以后,新增了数据结构 Spark-DataFrame,仿造的 R 和 python 的类 SQL 结构-DataFrame, 底层为 RDD, 能够让数据从业人员更好的操作 RDD.
为了减少网络及磁盘 IO 开销,需要设计出一种新的容错方式,RDD 是一种只读的数据块,可以从外部数据转换而来,你可以对RDD 进行函数操作(Operation),包括 Transformation 和 Action. 在这里只读表示当你对一个 RDD 进行了操作,那么结果将会是一个新的 RDD, 这种情况放在代码里,假设变换前后都是使用同一个变量表示这一 RDD,RDD 里面的数据并不是真实的数据,而是一些元数据信息,记录了该 RDD 是通过哪些 Transformation 得到的,在计算机中使用 lineage 来表示这种血缘结构,lineage 形成一个有向无环图 DAG, 整个计算过程中,将不需要将中间结果落地到 HDFS 进行容错,加入某个节点出错,则只需要通过 lineage 关系重新计算即可.
1).RDD 主要具有如下特点:
  • 1.它是在集群节点上的不可变的、已分区的集合对象;
  • 2.通过并行转换的方式来创建(如 Map、 filter、join 等);
  • 3.失败自动重建;
  • 4.可以控制存储级别(内存、磁盘等)来进行重用;
  • 5.必须是可序列化的;
  • 6.是静态类型的(只读)。
2)RDD 的创建方式主要有2种:
  • 并行化(Parallelizing)一个已经存在与驱动程序(Driver Program)中的集合如set、list;
  • 读取外部存储系统上的一个数据集,比如HDFS、Hive、HBase,或者任何提供了Hadoop InputFormat的数据源.也可以从本地读取 txt、csv 等数据集
3)RDD 的操作函数(operation)主要分为2种类型 Transformation 和 Action.
中间件|Spark 详解
文章图片

Spark 在遇到 Transformation 操作时只会记录需要这样的操作,并不会去执行,需要等到有 Action 操作的时候才会真正启动计算过程进行计算.
举例:
val arr = Array("cat", "dog", "lion", "monkey", "mouse") // create RDD by collection val rdd = sc.parallize(arr) // Map: "cat" -> c, cat val rdd1 = rdd.Map(x => (x.charAt(0), x)) // groupby same key and count val rdd2 = rdd1.groupBy(x => x._1). Map(x => (x._1, x._2.toList.length)) val result = rdd2.collect() print(result) // output:Array((d,1), (l,1), (m,2))

Spark 并不会立即执行函数,而是当你输入了val result = rdd2**.collect()**的时候, Spark 才会开始计算,从 sc.parallize(arr) 到最后的 collect,形成一个 Job.
shuffle与stage shuffle 是划分 DAG 中 stage 的标识,同时影响 Spark 执行速度的关键步骤.
shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle.
RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作.窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操作.宽依赖会发生 shuffle 操作.造成了数据在内存中的重新分布。
宽依赖主要有两个过程: shuffle write 和 shuffle fetch. 类似 Hadoop 的 Map 和 Reduce 阶段.
shuffle write 将 ShuffleMapTask 任务产生的中间结果缓存到内存中, shuffle fetch 获得 ShuffleMapTask 缓存的中间结果进行 ShuffleReduceTask 计算,这个过程容易造成OutOfMemory.
性能优化——shuffle
  1. 当进行联合的规约操作时,避免使用 groupByKey。举个例子,rdd.groupByKey().mapValues(_ .sum) 与 rdd.reduceByKey(_ + _) 执行的结果是一样的,但是前者需要把全部的数据通过网络传递一遍,而后者只需要根据每个 key 局部的 partition 累积结果,在 shuffle 的之后把局部的累积值相加后得到结果.
  2. 当输入和输出的类型不一致时,避免使用 reduceByKey。举个例子,我们需要实现为每一个key查找所有不相同的 string。一个方法是利用 map 把每个元素的转换成一个 Set,再使用 reduceByKey 将这些 Set 合并起来。
  3. 生成新列的时候,避免使用单独生成一列再 join 回来的方式,而是直接在数据上生成.类似c++上的emplace_back与push_back。
  4. 当需要对两个 RDD 使用 join 的时候,如果其中一个数据集特别小,小到能塞到每个 Executor 单独的内存中的时候,可以不使用 join, 使用 broadcast 操作将小 RDD 复制广播到每个 Executor 的内存里 join.
性能优化——cache 【中间件|Spark 详解】对多次使用的RDD进行持久化。
首先要认识到的是, .Spark 本身就是一个基于内存的迭代式计算,所以如果程序从头到尾只有一个 Action 操作且子 RDD 只依赖于一个父RDD 的话,就不需要使用 cache 这个机制,
需要 cache 的是当存在多个 Action 操作或者依赖于多个 RDD 的时候, 可以在那之前缓存RDD.
中间件|Spark 详解
文章图片

是否进行序列化和磁盘写入,需要充分考虑所分配到的内存资源和可接受的计算时间长短,序列化会减少内存占用,但是反序列化会延长时间,磁盘写入会延长时间,但是会减少内存占用,也许能提高计算速度.
此外要认识到:cache 的 RDD 会一直占用内存,当后期不需要再依赖于他的反复计算的时候,可以使用 unpersist 释放掉.

    推荐阅读