人生处万类,知识最为贤。这篇文章主要讲述大数据分布式计算系统 Spark 入门核心之 RDD相关的知识,希望能为你提供帮助。
概述Apache Spark 是一个快速且通用的集群计算系统。提供 java、Scala、python 和 R 中的高级 API,以及支持通用执行图的优化引擎。它还支持一组丰富的高级工具,包括用于 SQL 和结构化数据处理的 Spark SQL、用于机器学习的 MLlib、用于图形处理的 GraphX 和 Spark Streaming。
作者:幻好
本文主要介绍Spark入门知识,以及核心 RDD 相关概念。
Spark RDDRDD 基础概念RDD 是什么
?RDD?(Resilient Distributed Dataset)名为?弹性分布式数据集?,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。 具体释义:
- Resilient:弹性伸缩,RDD 里面的中的数据可以保存在内存中或者磁盘里面。
- Distributed:元素是分布式存储的,可直接用于分布式计算。
- Dataset:数据集合,可以存放很多元素。
RDD 设计目的
RDD 是一个?只读?的数据集,其不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用的方法,传入的函数)。
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的 MapReduce 框架采用非循环式的数据流模型,把中间结果写入到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。 AMP 实验室发表的一篇关于 RDD 的论文:??《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》??就是为了解决这些问题的。 RDD 提供了一个抽象的数据模型,使用户不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同 RDD 之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy 等)。
RDD 主要属性
在RDD内部,每个 RDD 都具有?五个主要属性?:
- A list of partitions(分区列表)
- A function for computing each split(计算每个拆分的函数)
- A list of dependencies on other RDDs(对其他 RDD 的依赖列表)
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)(可选地,一个用于键值 RDD 的分区器(例如,说 RDD 是哈希分区的))
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)((可选)计算每个拆分的首选位置列表(例如 HDFS 文件的块位置))
属性 | 说明 |
A list of partitions | 一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,分片数决定并行度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。 |
A function for computing each split | 一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。 |
A list of dependencies on other RDDs | 一个 RDD 会依赖于其他多个 RDD。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。(Spark 的容错机制) |
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) | -可选项,对于 KV 类型的 RDD 会有一个 Partitioner,即 RDD 的分区函数,默认为 HashPartitioner。 |
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) | 可选项,一个列表,存储存取每个 Partition 的优先位置(preferred location)。 对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能选择那些存有数据的 worker 节点来进行任务计算。 |
RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:
- 分区列表
- 计算函数
- 依赖关系
- 分区函数(默认是 hash)
- 最佳位置
RDD 基本操作RDD 对象创建
通过 ?
API文档参考:??https://spark.apache.org/docs/2.4.5/api/scala/index.html#org.apache.spark.SparkContext??
?SparkContext.scala?
? API 源码,能够了解到 RDD 可以通过以下方法创建:- 由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等。
// 创建一个 SparkContext 对象
val spConf = new SparkConf().setAppName("WordCount").setMaster("local")
val spContext = new SparkContext(spConf)
// 本地系统文件创建 RDD
val textFile = spContext.textFile("src/wordCount/temp/test.txt")
// HDFS 文件创建 RDD
val textFile1 = spContext.textFile("hdfs://node1:8088/wordCount/temp/test.txt")
- 通过已存在的 RDD 经过算子转换生成新的 RDD:
val rdd = textFile.flatMap(_.split(" "))
- 通过已存在的集合创建新的 RDD:
val rdd1 = spContext.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd2 = spContext.makeRDD(List(1,2,3,4,5,6,7,8))
??parallelize?
?? 方法以惰性并行化执行。如果 ??seq?
?? 是一个可变集合,并且在调用并行化之后和对 RDD 的第一个操作之前被更改,则生成的 RDD 将反映修改后的集合,传递参数的副本以避免这种情况。 避免使用 ??parallelize(Seq())?
?? 创建一个空的RDD 。考虑 emptyRDD 用于没有分区的 RDD,或 ??parallelize(Seq[T]())?
? 用于T的 RDD 有空分区。
RDD 算子概述
通过源码发现 ??makeRDD?
?? 方法底层还是调用的 ??parallelize?
? 方法
RDD 中的算子即官方中称为 Operator (操作),对应 RDD 对象中的方法。 为什么不直接叫方法呢?
- 之所以叫算子是为了与原 Scala 中对象的方法进行区分,因为 Scala 中对象的方法是在同一节点的内存中完成的,而 RDD 的算子是将计算逻辑发送到分布式节点(Executor 端)中执行的
- RDD 的方法外部操作都是在 Driver 端执行,而方法内部的逻辑代码是在 Executor 端执行
RDD 的算子分为两类:
参考博文理解:??https://blog.csdn.net/weixin_43101402/article/details/116065748??
- Transformations 转换操作,返回一个新的 RDD
- Actions 动作操作,返回值不是 RDD(无返回值或返回其他计算结果值)
Spark 还为 RDD 提供了统计相关算子:
RDD 中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给 Driver 的 Action 动作时,这些转换才会真正运行。 之所以使用惰性求值/延迟执行,是因为这样可以在 Action 时对 RDD 操作形成 DAG (有向无环图)进行 Stage 的划分和并行优化,这种设计让 Spark 更加有效率地运行。
Operator | 说明 |
count | 个数 |
mean | 均值 |
sum | 求和 |
max | 最大值 |
min | 最小值 |
variance | 方差 |
sampleVariance | 从采样中计算方差 |
stdev | 标准差:衡量数据的离散程度 |
sampleStdev | 采样的标准差 |
stats | 查看统计结果 |
Spark RDD 支持通用的 transformations 转换算子如下列表所示:
转换算子 | 功能说明 |
map(func) | 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成 |
filter(func) | 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成 |
flatMap(func) | 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] =>
Iterator[U] |
mapPartitionsWithIndex(func) | 类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator ??<
T>
? ?? =>
Iterator ??<
U>
? ? 的映射函数。 |
sample(withReplacement, fraction, seed) | 采样部分(比例取决于 fraction )数据,同时可以指定是否使用回置采样(withReplacement),以及随机数种子(seed) |
union(otherDataset) | 返回源数据集和参数数据集(otherDataset)的并集 |
intersection(otherDataset) | 返回源数据集和参数数据集(otherDataset)的交集 |
distinct([numPartitions])) | 返回对源数据集做元素去重后的新数据集 |
groupByKey([numPartitions]) | 只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable ??<
V>
? ?) 对。注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。 |
reduceByKey(func, [numPartitions]) | 如果源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) =>
V 的映射函数。另外,和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | 如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。 |
sortByKey([ascending], [numPartitions]) | 如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数) |
join(otherDataset, [numPartitions]) | 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。 |
cogroup(otherDataset, [numPartitions]) | 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable ??<
V>
? ??, Iterable ??<
W>
? ?))。该算子还有个别名:groupWith |
cartesian(otherDataset) | 如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。 |
pipe(command, [envVars]) | 以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。 |
coalesce(numPartitions) | 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。 |
repartition(numPartitions) | 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。 |
repartitionAndSortWithinPartitions(partitioner) | 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。 |
Actions
参考文档:??https://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#transformations??
Spark RDD 支持通用的 Actions 动作算子如下列表所示:
动作算子 | 功能说明 |
reduce(func) | 将RDD中元素按func进行聚合(func是一个 (T,T) =>
T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算) |
collect() | 将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。 |
count() | 返回数据集中元素个数 |
first() | 返回数据集中首个元素(类似于 take(1) ) |
take(n) | 返回数据集中前 n 个元素 |
takeSample(withReplacement, num, [seed]) | 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。 |
takeOrdered(n, [ordering]) | 按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素 |
saveAsTextFile(path) | 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。 |
saveAsSequenceFile(path)(Java and Scala) | 将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等) |
saveAsObjectFile(path)(Java and Scala) | 将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。 |
countByKey() | 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。 |
foreach(func) | 在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(??Accumulator??? ) 或者 和外部存储系统互操作。注意:用 foreach 操作出累加器之外的变量可能导致未定义的行为。更详细请参考前面的“理解闭包”(??Understanding closures??)这一小节。 |
RDD 算子实践
参考文档:??https://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#actions??
【大数据分布式计算系统 Spark 入门核心之 RDD】
/**
* 给定一个键值对 RDD: val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
* key 表示图书名称,value 表示某天图书销量 请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
* 最终结果:("spark",4),("hadoop",5)。
*/
def calRddDemo(sc: SparkContext): Unit =
val推荐阅读
- HashMap底层原理及jdk1.8源码解读吐血整理1.3w字长文
- k8s集群Job Pod 容器可能因为多种原因失效,想要更加稳定的使用Job负载,有哪些需要注意的地方()
- 除了关注业务连续性,运维还能创造什么价值()
- LVS负载均衡群集概念NAT模式LVS负载均衡实战部署
- Android C++系列(Linux线程线程原语)
- Apache Doris 编译部署
- 实践丨SpringBoot整合Mybatis-Plus项目存在Mapper时报错
- IN和EXISTS谁效率更高
- hive数据导入(文件导入)