spark|Spark RDD 初学

MapReduce:一次性数据计算 框架在处理数据时,从存储设备中读取数据,进行逻辑处理,最后将结果存储到介质中。
在迭代过程中,需要频繁的I/O操作。(早期mapreduce)
Spark:基于内存 迭代过程中,中间结果放置在内存当中大大加快了计算速度。若部署在共享存储的设备中,会遇到资源不足问题。
Spark https://www.cnblogs.com/cxxjohnson/p/8909578.html
Spark核心模块 Apache Spark Core
提供spark最基础最核心的功能,Spark其他的功能都是在Spark Core的基础上进行扩展的
Spark SQL
Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。
Spark Streaming
Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。
Spark MLlib
MLlib是Spark提供的一个机器学习的算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语
Spark GraphX
GraphX是Spark面向图计算提供的框架与算法库
Spark开始前的准备 任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数。
SparkConf()和SparkContext()

val conf = new SparkConf().setMaster("master").setAppName("appName") //setMaster(URL):参数为集群URL,告知Spark如何连接到集群上,参数有如下几种 //local表示让Spark运行在单机单线程上而无需连接到集群 //“local”在本地运行 //“local[4]”以4核在本地运行 //“spark://master:7077”在spark独立集群上运行 //setAppName(appname):参数为应用名,当连接到一个集群时,帮助我们在集群管理器的用户界面中找到应用 val sc = new SparkContext(conf) //val sc = new SparkContext("master","appName")

sc对象代表对计算集群的一个连接。一旦有了SparkContext, 就可以用它来创建RDD。
SparkSession()
在Spark2.0之前, SparkContext 是所有 Spark 功能的结构, 驱动器(driver) 通过SparkContext 连接到集群 (通过resource manager), 因为在2.0之前, RDD就是Spark的基础。
如果需要建立SparkContext,则需要SparkConf,通过Conf来配置SparkContext的内容。
在Spark2.0之后,Spark Session也是Spark 的一个入口, 为了引入dataframe和dataset的API, 同时保留了原来SparkContext的functionality
SparkSession spark = SparkSession.builder().appName("demo_spark").enableHiveSupport().getOrCreate(); //SparkSession.builder() 创建此方法用于构造SparkSession。 //SparkSession.builder() 创建此方法用于构造SparkSession。 //master(“local”) 设置要连接的master URL,例如: /*“local”在本地运行 “local[4]”以4核在本地运行 “spark://master:7077”在spark独立集群上运行*/ //appName( ) 设置将在spark Web UI中显示的应用程序的名称。如果未设置应用程序名称,则将使用随机生成的名称。 //Config 设置使用此方法设置的配置选项会自动传递到'SparkConf'和'SparkSession'自己的配置,它的参数由键值对组成。 //enableHiveSupport启用Hive支持,类似于HiveContext创建了sparkSession,我们可以用它来读取数据。 //getOrCreate()方法表示有就拿过来,没有就创建,类似于单例模式。//使用SparkSession读取数据SparkSession是读取数据的入口点,类似于旧的SQLContext.read。以下代码使用SparkSession从CSV读取数据: val df = spark.read.format("com.databricks.spark.csv")

SparkSession 时SparkContext、StreamingContext、SQLContext的集成
spark|Spark RDD 初学
文章图片

RDD:弹性分布式数据集 Spark中最基本的数据抽象
代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户再执行多个查询时显式的将工作集缓存在内存中,后续查询能够重用工作集,极大提升了查询速度。
RDD的属性
1)一组分片 数据集的基本组成单位。对于RDD来说,每一个分片都会被一个计算任务处理,并决定了并行计算的粒度。用户可以创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值(CPU核数)
2)一个计算每个分区的函数 RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
3)RDD之间的依赖关系 RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
4)一个Partitioner:RDD的分片函数,分区器 当前Spark中实现了两种类型的分片函数,一种是基于哈希的HashPartioner,另一种时基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner。非key-value的RDD的Parititioner的值时None。Partitioner函数不但决定了RDD本身的分片数量,也决定了Parent RDD Shuffle输出时的分片数量。
5)一个列表:首选位置,移动计算 存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照”移动数据不如移动计算“的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
用来判断计算发送给哪个节点效率最优!
RDD的创建
1) 从已有的scala数组中创建RDD
//创建RDD,里面包含1-9,分别在3个分区中 val a = sc.parallelize(1 to 9, 3)

1) 读取文件创建RDD
//每一行就是RDD的一个元素 val b = sc.textFile("text.txt")

RDD编程API
主要分两类,转换(transformation)和动作(action)。两类函数的主要区别是,transformation接受RDD并返回RDD,而action接受RDD返回非RDD.
transformation操作是延迟计算的,也就是说从一个RDD生成另一个RDD的转换操作不是马上执行,需要等到有action操作的时候才真正触发运算。
action算子会触发spark提交作业job,并将数据输出spark系统。
Transformation 1)map(func) 对RDD中的每个元素都执行一个指定函数来产生一个新的RDD。任何原来RDD中的元素在新RDD中都有且只有一个元素与之对应。
val a = sc.parallelize(1 to 9, 3) val b = a.map(x => x*2) a.collect //Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) b.collect //Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

2)flatmap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
val a = sc.parallelize(1 to 4, 2) val b = a.flatMap(x => 1 to x) b.collect //Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4,1,2,3,4

3)mapPartitions(func) mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
它的函数定义为:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。
spark|Spark RDD 初学
文章图片

4)mapWith(func) 是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:
def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。
举例:把partition index 乘以10,然后加上2作为新的RDD的元素。
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)x.mapWith(a => a * 10)((a, b) => (b + 2)).collect//Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

5)flatMapWith flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partition Index作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:
def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

//举例 val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect//Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,8, 2, 9)

6)flatMapValues flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。
val a = sc.parallelize(List((1,2),(3,4),(3,6))) val b = a.flatMapValues(x=>x.to(5)) b.collect //Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5)) //上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

7)union 对源RDD和参数RDD求并集后返回一个新的RDD
val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4)) //求并集 val rdd3 = rdd1.union(rdd2) rdd3.collect //Array[Int] = Array(5, 6, 4, 3, 1, 2, 3, 4) //distinct 去重 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4)) //求并集 val rdd3 = rdd1.union(rdd2) //Array[Int] = Array(5, 6, 4, 3, 1, 2)

8)filter filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) val b = a.filter(_ > 5) b.collect //Array[Int] = Array(6, 7, 8, 9)

9)coalesce(numPartitions: Int, shuffle: Boolean = false) 该函数用于将RDD进行重分区,使用HashPartitioner。第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
var data = https://www.it610.com/article/sc.parallelize(1 to 12, 3) data.collect //Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12) data.partitions.size //Int = 3 var rdd1 = data.coalesce(1) rdd1.partitions.size //Int = 1 var rdd1 = data.coalesce(4) rdd1.partitions.size //Int = 3 如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便 var rdd1 = data.coalesce(4,true) rdd1.partitions.size //Int = 4

10)groupBy spark|Spark RDD 初学
文章图片

11)groupbyKey 【spark|Spark RDD 初学】在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2))) val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2))) //求并集 val rdd4 = rdd1 union rdd2 //按key进行分组 val rdd5 = rdd4.groupByKey rdd5.collect //Array[(String, Array[Int])] = Array(("tom",CompactBuffer(1,1)), ("jerry",CompactBuffer(3,2), ("kitty",CompactBuffer(2),("shuke",CompactBuffer(2))

累加器:分布式共享写变量 rry”, 2), (“tom”, 1), (“shuke”, 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd5 = rdd4.groupByKey
rdd5.collect
//Array[(String, Array[Int])] = Array((“tom”,CompactBuffer(1,1)), (“jerry”,CompactBuffer(3,2), (“kitty”,CompactBuffer(2),(“shuke”,CompactBuffer(2))
## 累加器:分布式共享写变量## 广播变量:分布式共享读变量

    推荐阅读