大数据分布式计算系统 Spark 入门核心之 RDD

人生处万类,知识最为贤。这篇文章主要讲述大数据分布式计算系统 Spark 入门核心之 RDD相关的知识,希望能为你提供帮助。




作者:幻好
概述Apache Spark 是一个快速且通用的集群计算系统。提供 java、Scala、python 和 R 中的高级 API,以及支持通用执行图的优化引擎。它还支持一组丰富的高级工具,包括用于 SQL 和结构化数据处理的 Spark SQL、用于机器学习的 MLlib、用于图形处理的 GraphX 和 Spark Streaming。
本文主要介绍Spark入门知识,以及核心 RDD 相关概念。

Spark RDDRDD 基础概念RDD 是什么
?RDD?(Resilient Distributed Dataset)名为?弹性分布式数据集?,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。 具体释义:
  • Resilient:弹性伸缩,RDD 里面的中的数据可以保存在内存中或者磁盘里面。
  • Distributed:元素是分布式存储的,可直接用于分布式计算。
  • Dataset:数据集合,可以存放很多元素。


RDD 是一个?只读?的数据集,其不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用的方法,传入的函数)。


RDD 设计目的
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的 MapReduce 框架采用非循环式的数据流模型,把中间结果写入到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。 AMP 实验室发表的一篇关于 RDD 的论文:??《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》??就是为了解决这些问题的。 RDD 提供了一个抽象的数据模型,使用户不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同 RDD 之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy 等)。
RDD 主要属性
在RDD内部,每个 RDD 都具有?五个主要属性?:
  • A list of partitions(分区列表)
  • A function for computing each split(计算每个拆分的函数)
  • A list of dependencies on other RDDs(对其他 RDD 的依赖列表)
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)(可选地,一个用于键值 RDD 的分区器(例如,说 RDD 是哈希分区的))
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)((可选)计算每个拆分的首选位置列表(例如 HDFS 文件的块位置))
属性说明:
属性
说明
A list of partitions
一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,分片数决定并行度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。
A function for computing each split
一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。
A list of dependencies on other RDDs
一个 RDD 会依赖于其他多个 RDD。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。(Spark 的容错机制)
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
-可选项,对于 KV 类型的 RDD 会有一个 Partitioner,即 RDD 的分区函数,默认为 HashPartitioner。
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
可选项,一个列表,存储存取每个 Partition 的优先位置(preferred location)。 对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能选择那些存有数据的 worker 节点来进行任务计算。
RDD 数据结构
RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:
  • 分区列表
  • 计算函数
  • 依赖关系
  • 分区函数(默认是 hash)
  • 最佳位置
分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区; 计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。
RDD 基本操作RDD 对象创建


API文档参考:??https://spark.apache.org/docs/2.4.5/api/scala/index.html#org.apache.spark.SparkContext??


通过 ??SparkContext.scala?? API 源码,能够了解到 RDD 可以通过以下方法创建:
  • 由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等。

// 创建一个 SparkContext 对象
val spConf = new SparkConf().setAppName("WordCount").setMaster("local")
val spContext = new SparkContext(spConf)
// 本地系统文件创建 RDD
val textFile = spContext.textFile("src/wordCount/temp/test.txt")
// HDFS 文件创建 RDD
val textFile1 = spContext.textFile("hdfs://node1:8088/wordCount/temp/test.txt")

  1. 通过已存在的 RDD 经过算子转换生成新的 RDD:
val rdd = textFile.flatMap(_.split(" "))

  1. 通过已存在的集合创建新的 RDD:
val rdd1 = spContext.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd2 = spContext.makeRDD(List(1,2,3,4,5,6,7,8))




??parallelize??? 方法以惰性并行化执行。如果 ??seq??? 是一个可变集合,并且在调用并行化之后和对 RDD 的第一个操作之前被更改,则生成的 RDD 将反映修改后的集合,传递参数的副本以避免这种情况。 避免使用 ??parallelize(Seq())??? 创建一个空的RDD 。考虑 emptyRDD 用于没有分区的 RDD,或 ??parallelize(Seq[T]())?? 用于T的 RDD 有空分区。





通过源码发现 ??makeRDD??? 方法底层还是调用的 ??parallelize?? 方法


RDD 算子概述
RDD 中的算子即官方中称为 Operator (操作),对应 RDD 对象中的方法。 为什么不直接叫方法呢?
  • 之所以叫算子是为了与原 Scala 中对象的方法进行区分,因为 Scala 中对象的方法是在同一节点的内存中完成的,而 RDD 的算子是将计算逻辑发送到分布式节点(Executor 端)中执行的
  • RDD 的方法外部操作都是在 Driver 端执行,而方法内部的逻辑代码是在 Executor 端执行


参考博文理解:??https://blog.csdn.net/weixin_43101402/article/details/116065748??


RDD 的算子分为两类:
  • Transformations 转换操作,返回一个新的 RDD
  • Actions 动作操作,返回值不是 RDD(无返回值或返回其他计算结果值)


RDD 中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给 Driver 的 Action 动作时,这些转换才会真正运行。 之所以使用惰性求值/延迟执行,是因为这样可以在 Action 时对 RDD 操作形成 DAG (有向无环图)进行 Stage 的划分和并行优化,这种设计让 Spark 更加有效率地运行。


Spark 还为 RDD 提供了统计相关算子:
Operator
说明
count
个数
mean
均值
sum
求和
max
最大值
min
最小值
variance
方差
sampleVariance
从采样中计算方差
stdev
标准差:衡量数据的离散程度
sampleStdev
采样的标准差
stats
查看统计结果
Transformations
Spark RDD 支持通用的 transformations 转换算子如下列表所示:
转换算子
功能说明
map(func)
返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成
filter(func)
返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成
flatMap(func)
类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)
mapPartitions(func)
类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)
类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator ??< T> ??? => Iterator ??< U> ?? 的映射函数。
sample(withReplacement, fraction, seed)
采样部分(比例取决于 fraction )数据,同时可以指定是否使用回置采样(withReplacement),以及随机数种子(seed)
union(otherDataset)
返回源数据集和参数数据集(otherDataset)的并集
intersection(otherDataset)
返回源数据集和参数数据集(otherDataset)的交集
distinct([numPartitions]))
返回对源数据集做元素去重后的新数据集
groupByKey([numPartitions])
只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable ??< V> ??) 对。注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。
reduceByKey(func, [numPartitions])
如果源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) => V 的映射函数。另外,和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
sortByKey([ascending], [numPartitions])
如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数)
join(otherDataset, [numPartitions])
如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
cogroup(otherDataset, [numPartitions])
如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable ??< V> ???, Iterable ??< W> ??))。该算子还有个别名:groupWith
cartesian(otherDataset)
如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。
pipe(command, [envVars])
以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。
coalesce(numPartitions)
将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。
repartition(numPartitions)
将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。
repartitionAndSortWithinPartitions(partitioner)
根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。


参考文档:??https://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#transformations??


Actions
Spark RDD 支持通用的 Actions 动作算子如下列表所示:
动作算子
功能说明
reduce(func)
将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算)
collect()
将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。
count()
返回数据集中元素个数
first()
返回数据集中首个元素(类似于 take(1) )
take(n)
返回数据集中前 n 个元素
takeSample(withReplacement, num, [seed])
返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。
takeOrdered(n, [ordering])
按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素
saveAsTextFile(path)
将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。
saveAsSequenceFile(path)(Java and Scala)
将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)
saveAsObjectFile(path)(Java and Scala)
将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。
countByKey()
只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。
foreach(func)
在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(??Accumulator??? ) 或者 和外部存储系统互操作。注意:用 foreach 操作出累加器之外的变量可能导致未定义的行为。更详细请参考前面的“理解闭包”(??Understanding closures??)这一小节。


参考文档:??https://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#actions??
【大数据分布式计算系统 Spark 入门核心之 RDD】

RDD 算子实践
/**
* 给定一个键值对 RDD: val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
* key 表示图书名称,value 表示某天图书销量 请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
* 最终结果:("spark",4),("hadoop",5)。
*/
def calRddDemo(sc: SparkContext): Unit =
val

    推荐阅读