spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量

目录
RDD序列化
(1)闭包检查
(2)序列化方法和属性
(3)Kryo序列化
RDD依赖关系
(1)RDD 血缘关系
(2)RDD 依赖关系
(3)RDD窄依赖
(4)RDD宽依赖
(5)RDD阶段划分
(6)RDD任务划分
RDD持久化
(1)RDD Cache缓存
(2)RDD CheckPoint检查点
(3)缓存和检查点的区别
RDD分区器
累加器:分布式共享只写变量
(1)系统累加器
(2)自定义累加器
广播变量:分布式共享只读变量

RDD序列化 (1)闭包检查
计算的角度:算子外的代码都在Driver端执行, 算子内的代码都在Executor端执行。
在scala函数式编程中,算子内经常会用到算子外的数据,这样形成了闭包的效果;算子外的数据没有序列化,意味着无法传值给Executor端执行,就会发生错误;所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
(2)序列化方法和属性

import org.apache.spark.{SparkConf, SparkContext}object RDD_Serialiazble { def main(args: Array[String]): Unit = {//TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Serializable") val sc = new SparkContext(sparkConf)//TODO //创建RDD val rdd = sc.makeRDD(Array("Data","hello world", "hello scala", "hello spark","world","Big data")) //创建search对象 val search = new Search("hello") //函数传递 search.getMatch1(rdd).collect().foreach(println) println(">>>>>>>>>>>") //属性传递 search.getMatch2(rdd).collect().foreach(println)//TODO 关闭环境 sc.stop() } } //查询对象 class Search(query:String) extends Serializable { def isMatch(s:String): Boolean = { s.contains(this.query) } //函数序列化 def getMatch1 (rdd: RDD[String]): RDD[String] = { rdd.filter(isMatch) } //属性序列化 def getMatch2(rdd: RDD[String]) : RDD[String] = { rdd.filter(x => x.contains(query)) } }

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


(3)Kryo序列化
1)Java的序列化能够序列化任何的类。但是Java的序列化字节多,序列化后对象的提交也比较大。Spark出于性能的考虑,2.0 开始支持另外一种序列化机制——Kryo。
2)Kryo的速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
3)使用Kryo序列化也要继承Serializable接口。
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object RDD_Serializable_Kryo { def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Serializable") //替换序列化机制 .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //使用kryo序列化的自定义类 .registerKryoClasses(Array(classOf[Search2]))val sc = new SparkContext(sparkConf)//TODO //创建RDD val rdd = sc.makeRDD(Array("Data","hello world", "hello scala","world","Big data")) //创建search对象 val search2 = new Search2("hello") //函数传递 val result = search2.getMatch3(rdd) result.collect().foreach(println) println("<<<<<<<<") //属性传递 search2.getMatch4(rdd).collect().foreach(println)//TODO 关闭环境 sc.stop() } } //查询对象 class Search2(query: String) extends Serializable { def isMatch(s: String) = { s.contains(query) } //函数序列化 def getMatch3(rdd: RDD[String]) = { rdd.filter(isMatch) } //属性序列化 def getMatch4(rdd: RDD[String]) = { rdd.filter(_.contains(query)) } }

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


RDD依赖关系 (1)RDD 血缘关系
1)RDD只支持粗粒度转换。
2)将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录 RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
注:
粗粒度:针对整个数据集进行转换操作,而不是针对数据集中的某个元素进行转换操作;
细粒度:针对数据集中的某个元素进行转换操作。
spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片

def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Depend") val sc = new SparkContext(sparkConf) //TODO 依赖关系 val rdd = sc.textFile("datas/word2.txt") println(rdd.toDebugString) println(">>>>>>>>>>>") val flatRDD = rdd.flatMap(_.split(" ")) println(flatRDD.toDebugString) println(">>>>>>>>>>>") val mapRDD = flatRDD.map((_, 1)) println(mapRDD.toDebugString) println(">>>>>>>>>>>>>>") val resultRDD = mapRDD.reduceByKey(_ + _) println(resultRDD.toDebugString) println(">>>>>>>>>>>>>>") resultRDD.collect().foreach(println) //TODO 关闭环境 sc.stop() }

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


(2)RDD 依赖关系
依赖关系其实就是两个相邻RDD之间的关系。
def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Depend") val sc = new SparkContext(sparkConf) //TODO 依赖关系 val rdd = sc.textFile("datas/word2.txt") println(rdd.dependencies) println(">>>>>>>>>>>") val flatRDD = rdd.flatMap(_.split(" ")) println(flatRDD.dependencies) println(">>>>>>>>>>>") val mapRDD = flatRDD.map((_, 1)) println(mapRDD.dependencies) println(">>>>>>>>>>>>>>") val resultRDD = mapRDD.reduceByKey(_ + _) println(resultRDD.dependencies) println(">>>>>>>>>>>>>>") resultRDD.collect().foreach(println) //TODO 关闭环境 sc.stop() }

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


(3)RDD窄依赖
窄依赖表示每一个父RDD的Partition只能被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
(4)RDD宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,宽依赖我们形象的比喻为多生。
(5)RDD阶段划分
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。
DAG记录了部分RDD的转换过程和任务的阶段。
ShuffleMap阶段,Result阶段
spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片

(6)RDD任务划分
RDD任务切分,中间分为:Application、Job、Stage 和 Task
Application:初始化一个SparkContext即生成一个Application;
Job:一个Action算子就会生成一个 Job;
Stage:Stage等于宽依赖(ShuffleDependency)的个数加 1;
Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注:Application->Job->Stage->Task每一层都是1对n的关系。

RDD持久化 (1)RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据缓存在JVM的堆内存中。但并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片

注:缓存有可能丢失
1)存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。
2)通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可。
3)Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。当一个节点 Shuffle失败了可以避免重新计算整个输入。
4)在实际使用中,如果想重用数据,建议调用persist或cache。
def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Persist") val sc = new SparkContext(sparkConf) //TODO val rdd = sc.makeRDD(List("hello world", "hello scala")) val flatRDD = rdd.flatMap(_.split(" ")) val mapRDD = flatRDD.map((_, 1)) //cache数据放入内存 mapRDD.cache() //persist数据放入磁盘 //mapRDD.persist(StorageLevel.DISK_ONLY) val reduceRDD = mapRDD.reduceByKey(_ + _) reduceRDD.collect().foreach(println)println("<<<<<<<<<<<<<") val groupRDD = mapRDD.groupByKey() groupRDD.collect().foreach(println)//TODO 关闭环境 sc.stop() }

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


(2)RDD CheckPoint检查点
所谓的检查点就是将RDD中间结果写入磁盘,由于血缘依赖过长会造成容错成本过高,在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做,减少开销。
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("CheckPoint") val sc = new SparkContext(sparkConf) //TODO 检查点 //checkpoint要指定检查点的保存路径 sc.setCheckpointDir("checkpoint") val rdd = sc.makeRDD(List("hello world", "hello scala")) val flatRDD = rdd.flatMap(_.split(" ")) val mapRDD = flatRDD.map((_, 1)) mapRDD.checkpoint() val reduceRDD = mapRDD.reduceByKey(_ + _) reduceRDD.collect().foreach(println)println("<<<<<<<<<<<<<") val groupRDD = mapRDD.groupByKey() groupRDD.collect().foreach(println) //TODO 关闭环境 sc.stop() }

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


(3)缓存和检查点的区别
1)Cache缓存只是将数据保存起来,不切断血缘依赖;Checkpoint检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则要从头计算RDD。

RDD分区器 Spark目前支持Hash分区、Range分区和用户自定义分区。
Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了reduce的个数。
只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None。
每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
(1)Hash分区: 对于给定的key,计算其hashCode,并除以分区个数取余。
(2)Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。
注:前面博文写过分区器,这里只演示自定义分区器,其他的不做演示。
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object RDD_OnesPart { def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Part") val sc = new SparkContext(sparkConf) //TODO 自定义分区器 val rdd = sc.makeRDD(List( ("CBA", "....."), ("NBA", "....."), ("WWW", "....."), ("COM", ".....") ), 3) val partRDD = rdd.partitionBy(new MyPartitioner) partRDD.saveAsTextFile("output")//TODO 关闭环境 sc.stop() }//TODO 自定义分区器 /** * 1.继承partitioner * 2.重写方法 * * * */ class MyPartitioner extends Partitioner { //分区数量 override def numPartitions: Int = 3 //根据数据的key值返回分区索引 override def getPartition(key: Any): Int = { key match { case "CBA" => 0 case "NBA" => 1 case _ => 2 } } }}

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


累加器:分布式共享只写变量累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
(1)系统累加器
def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc") val sc = new SparkContext(sparkConf) //TODO 累加器——系统累加器 val rdd = sc.makeRDD(List(1, 2, 3, 4)) //实现累加的方法一:reduce(—_+_) //val reduceRDD = rdd.reduce(_ + _) //s\实现累加器的方法二 val sum = sc.longAccumulator("sum") rdd.foreach( num => { sum.add(num) } ) //1+2+3+4=10 println(sum.value)//TODO 关闭环境 sc.stop() }

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


(2)自定义累加器
import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject RDD_Acc_Ones { def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc") val sc = new SparkContext(sparkConf) //TODO 累加器——自定义累加器 val rdd = sc.makeRDD(List( "hello","spark","hello","world" )) //rdd.map((_,1)).reduceByKey(_+_)//累加器 //创建累加器对象 val accWc = new MyAccumulator() //向spark进行注册 sc.register(accWc,"wc")rdd.foreach( word => { accWc.add(word)} ) println(accWc.value)//TODO 关闭环境 sc.stop() } /**MyAccumulator * 自定义累加器 * 1.继承 AccumulatorV2,设置泛型 * 2.重写累加器的抽象方法 */ class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{ var map : mutable.Map[String, Long] = mutable.Map() // 累加器是否为初始状态override def isZero: Boolean = { map.isEmpty }// 复制累加器 override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new MyAccumulator }// 重置累加器 override def reset(): Unit = { map.clear() }// 向累加器中增加数据 (In) override def add(word: String): Unit = { // 查询 map 中是否存在相同的单词 // 如果有相同的单词,那么单词的数量加 1 // 如果没有相同的单词,那么在 map 中增加这个单词 map(word) = map.getOrElse(word, 0L) + 1L }// 合并累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { val map1 = map val map2 = other.value // 两个 Map 的合并 map = map1.foldLeft(map2)( ( innerMap, kv ) => { innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2 innerMap } ) }// 返回累加器的结果 (Out) override def value: mutable.Map[String, Long] = map }}

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片


广播变量:分布式共享只读变量 原理:广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量很合适。在多个并行操作中使用同一个变量,但是Spark会为每个任务分别发送。
import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object RDD_Broadcast { def main(args: Array[String]): Unit = { //TODO 创建环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Acc") val sc = new SparkContext(sparkConf) //TODO 累加器——广播变量 val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4) val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) ) // 声明广播变量 val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list) val resultRDD: RDD[(String, (Int, Int))] = rdd1.map { case (key, num) => { var num2 = 0 // 使用广播变量 for ((k, v) <- broadcast.value) { if (k == key) { num2 = v } } (key, (num, num2)) } } resultRDD.collect().foreach(println) //TODO 关闭环境 sc.stop() }}

spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量
文章图片



【spark|spark学习笔记(七)——sparkcore核心编程-RDD序列化/依赖关系/持久化/分区器/累加器/广播变量】本文仅仅是学习笔记的记录!

    推荐阅读