批处理框架|Spark学习笔记(3) - 关于Spark常用的transform算子的一些总结??????

RDD 常用 transform 算子Read Me ??

  • Code : 本章节代码实现
  • CSDN : wangt的博客
  • ?? Github : https://github.com/lovewangtzq
  • ?? 微信公众号 : 大数据初学者
  • b站: 我学不会Spark
创建 RDD 的三种方法
  1. val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  2. val rdd2 = sc.textfile
  3. rdd 之间的转化
  • 注意 ? : RDD 中所有的转换都是延迟加载的 ,也就是说他们并不会直接结果,而是只会计算这些应用到基础数据集上的转换动作, 只有当执行action 算子的时候才会真正执行
  • 测试数据
    hello,hello,hello java,java,java python,python

map
  1. 介绍 : 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 ,也就是说RDD中每个元素都会执行一次这个方法
代码案例
package transformation import org.apache.spark.{SparkConf, SparkContext}/** * 测试 map的算子 * * @author 王天赐 * @create 2019-07-24 15:46 */ object MapTF {def main(args: Array[String]): Unit = {// 读取的文件的路径 val path = "data/SparkCore/student.txt"// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 读取数据 val data = https://www.it610.com/article/sc.parallelize(Array('A', 'A', 'B', 'C', 'C'))val dataRDD = data.map{ x => (x,1) }dataRDD.foreach(println(_))sc.stop() } } // 结果 (A,1) (A,1) (B,1) (C,1) (C,1)

mapParatitions
?? 尽量使用mapParatitions
介绍 :
功能和map类似,但是区别是 它是独立在RDD上的每一个分片上运行的, 因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次, 一个函数一次处理所有分区 (这也这正是要多用mapParatitions的原因)
1. 代码案例```java package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * 测试 map的算子 * * @author 王天赐 * @create 2019-07-24 15:46 */ object MapParatitionsTF {def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 读取数据 val data = https://www.it610.com/article/sc.parallelize(Array("A", "A", "B", "C", "C"))val dataRDD = data.mapPartitions { x =>// 创建Tuple类型的集合 用于存储数据 // List 需要创建成 var 类型的因为拼接时需要指向新的List的对象 var list = List[Tuple2[String, Int]]()while (x.hasNext) { // x var elem = x.next() // 将数据存入List中 再使用拼接集合的方式将数据添加到 list = list.:::(List(new Tuple2[String, Int](elem, 1))) // 备注 : 这里 ::: 和 :: 的区别是 ::: 的参数是 List , 而 :: 的参数是元素 } // 返回 List的 iterator 类型 list.iterator }// 遍历结果 dataRDD.foreach { x => println(x) }sc.stop() } } // 结果 (C,1) (C,1) (B,1) (A,1) (A,1)

这里需要注意 : 这里 ::: 和 :: 的区别是 ::: 的参数是 List , 而 :: 的参数是元素
glom
介绍 : 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
代码案例 :
```java package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-24 22:19 */ object GlomTF {def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.range(1, 100, 2, 4) // 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]] val glomRDD = data.glom() // 遍历 glomRDD.foreach{ x => println(x.toList) } sc.stop() } } // 结果 List(1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23) List(25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49) List(51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73) List(75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99)

flatMap
1. 介绍 : 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)1. 代码案例 :```scala package transformationimport org.apache.spark.{SparkConf, SparkContext} import Array._/** * @author 王天赐 * @create 2019-07-24 22:19 */ object FlatMapTF {def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 返回结构是 Long 类型的 val data = https://www.it610.com/article/sc.range(1 ,3)// 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) val flatMapDataRDD = data.flatMap{x => var list2 = List( List(1 * x,2,3), List(4 * x,5,6) ) list2 }val mapDataRDD = data.map{x => var list2 = List( List(1 * x,2,3), List(4 * x,5,6) ) list2 }flatMapDataRDD.foreach(println(_)) println(flatMapDataRDD.collect().toList) println("==" * 20) println(mapDataRDD.collect().toList) mapDataRDD.foreach(println(_))sc.stop() } } // 输出结果 List(1, 2, 3) List(4, 5, 6) List(2, 2, 3) List(8, 5, 6) List(List(1, 2, 3), List(4, 5, 6), List(2, 2, 3), List(8, 5, 6)) ======================================== List(List(List(1, 2, 3), List(4, 5, 6)), List(List(2, 2, 3), List(8, 5, 6))) List(List(1, 2, 3), List(4, 5, 6)) List(List(2, 2, 3), List(8, 5, 6)) ```注意 :star:: 可以看到 输出的结果是 一个二维数组 , map 函数输出的是两个二维数组 ,但是 flatmap 输出的是 四个一维的数组 , 其实flatmap 就是 把多维数组拆成一维数组然后组合

filter
介绍 :返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-24 22:19 */ object FilterTF {def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.range(1, 10,numSlices = 4) // 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 val filterDataRDD = data.filter{x => // 过滤掉不是偶数的数 x % 2 == 0 } // 打印结果 println(filterDataRDD.collect().toList)sc.stop() } } // 结果 List(2, 4, 6, 8)

mapPartitionsWithIndex
介绍 :
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
代码案例 :
package transformation import org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-24 22:19 */ object MapPartitionsWithIndexTF {def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD val data = https://www.it610.com/article/sc.parallelize(Array("A", "A", "B", "C", "C", "D") , numSlices = 3)// 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时, // func的函数类型必须是(Int, Interator[T]) => Iterator[U] val mapParatitionWIDataRDD = data.mapPartitionsWithIndex( (index, x) => { var list = List[Tuple2[String, Int]]()while(x.hasNext){ val elem = x.next() list = list.::(new Tuple2(elem, index)) } list.iterator }) // 打印结果 println(mapParatitionWIDataRDD.collect().toList)sc.stop() } } // 结果 List((A,0), (A,0), (C,1), (B,1), (D,2), (C,2)) ```

sample
  1. 以指定的随机种子随机抽样出数量为fraction(一般在 [0-1] 如果是 0.3 表示百分之30%)的数据
  2. fraction 这个数据并不是百分之百准确的 !
  3. withReplacement表示是抽出的数据是否放回 ,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
  4. 随机种子值为3(即可能以1 2 3的其中一个起始值)
代码案例
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-24 22:19 */ object SampleTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.range(1, 100,numSlices = 4)/** * sample(withReplacement, fraction, seed) * 以指定的随机种子随机抽样出数量为fraction(一般在 [0-1] 如果是 0.3 表示百分之30%)的数据, * 并且这个数据并不是百分之百准确的 ! * withReplacement表示是抽出的数据是否放回, * true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。 * 例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值) */ val sampleDataRDD = data.sample(withReplacement = true, fraction = 0.04, seed = 9)println(sampleDataRDD.collect().toList) sc.stop() } }

distinct([numTasks]))
介绍
对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object DistinctTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.parallelize(Array(1,1,1,2,2,3,3,4))// 对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作, // 但是可以传入一个可选的numTasks参数改变它。 val distinctDataRDD = data.distinct(numPartitions = 8)println(distinctDataRDD.collect().toList)sc.stop() } } // 结果 List(1, 2, 3, 4)

partitionBy
【批处理框架|Spark学习笔记(3) - 关于Spark常用的transform算子的一些总结??????】介绍 :
对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD。
代码案例 :
package transformationimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object PartitionByTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.parallelize(Array(("A",1),("B",2), ("C",3) , ("D", 4)), 4)// 对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, // 否则会生成ShuffleRDD。 必须是 kv格式的RDD 才能进行分区// 使用 Hash Partitioner 进行分区 ,分区数是 4 个分区 val partitionerByDataRDD = data.partitionBy(new HashPartitioner(2))println("原分区数 : " + data.partitions.size) println("分区数 : " + partitionerByDataRDD.partitions.size)sc.stop() } } // 结果 原分区数 : 4 分区数 : 2

Coalesce
介绍 :
与repartition的区别: repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] repartition只是coalesce接口中shuffle为true的实现.
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
  1. 代码案例
    package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object CoalesceTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.range(1,20,1,4)// 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。 // parationBy 只能应用于 kv数据格式的RDD val coalesceDataRDD = data.coalesce(2)println("原分区数 : " + data.partitions.size) println("分区数 : " + coalesceDataRDD.partitions.size)sc.stop() } } // 结果 原分区数 : 4 分区数 : 2

repartition
介绍
根据分区数,从新通过网络随机洗牌(不同的 excuter 之间的数据shuffle)所有数据。
  1. 代码案例
    package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object RepartitionTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.range(1,20,1,4)// 根据分区数,从新通过网络随机洗牌(不同的 excuter 之间的数据shuffle)所有数据。 val repartitionDataRDD = data.repartition(4)data.glom().foreach(x => println(x.toList)) println("==" * 20) repartitionDataRDD.glom().foreach(x => println(x.toList))sc.stop() } } // 结果 List(1, 2, 3, 4) List(5, 6, 7, 8, 9) List(10, 11, 12, 13, 14) List(15, 16, 17, 18, 19) ======================================== List(2, 6, 11, 16) List(3, 7, 12, 17) List(4, 8, 13, 18) List(1, 5, 9, 10, 14, 15, 19)

RepartitionAndSortWithinPartitions
介绍
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。 只有kv 格式的数据才能使用
代码案例 :
package transformationimport org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object RepartitionAndSortWithinPartitionsTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.parallelize(Array(("A",1),("B",2), ("C",3) , ("D", 4)), 4)// repartitionAndSortWithinPartitions函数是repartition函数的变种, // 与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序, // 性能比repartition要高。 val repartitionASWPDataRDD = data.repartitionAndSortWithinPartitions(new HashPartitioner(3))println(repartitionASWPDataRDD.collect().toList) sc.stop() } } // 结果 List((B,2), (C,3), (A,1), (D,4))

sortBy
sortBy(func,[ascending], [numTasks])
用func先对数据进行处理,按照处理后的数据比较结果排序。
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object SortByTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.parallelize(Array(1,3,2,4,5,6,3,8,1,2))// 用func先对数据进行处理,按照处理后的数据比较结果排序。 val sortByDataRDD = data.sortBy()sc.stop() } } // 结果 List(1, 1, 2, 2, 3, 3, 4, 5, 6, 8)

union
union(otherDataset)
对源RDD和参数RDD求并集后返回一个新的RDD 不去重
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 16:51 */ object UnionTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,10) 取不到 10 只能是 1 -> 9 val data01 = sc.range(1, 10,1,2) val data02 = sc.range(5, 18,1,2)// 对源RDD和参数RDD求并集后返回一个新的RDD不去重 // 要求两个 RDD 的数据类型一致 val unionDataRDD = data01.union(data02) println(unionDataRDD.collect().toList)sc.stop() } } // List(1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)

subtract
计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
A.subtract(B) 返回 A中有但是 B中没有的元素组成的新的RDD
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 16:51 */ object SubtractTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,10) 取不到 10 只能是 1 -> 9 val data01 = sc.range(1, 10,1,2) val data02 = sc.range(5, 18,1,2)//计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来 // 要求两个 RDD 的数据类型一致 val subtractDataRDD = data01.subtract(data02) println(subtractDataRDD.collect().toList)sc.stop() } } // 结果 List(2, 4, 1, 3)

intersection
对源RDD和参数RDD求交集后返回一个新的RDD
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 16:51 */ object IntersectionTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,intersectionDataRDD) 取不到 10 只能是 1 -> 9 val data01 = sc.range(1, 10,1,2) val data02 = sc.range(5, 18,1,2)// 对源RDD和参数RDD求交集后返回一个新的RDD // 要求两个 RDD 的数据类型一致 val intersectionDataRDD = data01.intersection(data02) println(intersectionDataRDD.collect().toList)sc.stop() } } // 结果 List(6, 8, 7, 9, 5)

cartesian
计算两个RDD 的迪卡尔积
笛卡尔积 : 笛卡尔乘积是指在数学中,两个集合XY的笛卡尓积(Cartesian product),又称直积,表示为X × Y,第一个对象是X的成员而第二个对象是Y的所有可能有序对的其中一个成员 [1] 。
例如,A={a,b}, B={0,1,2},则
A×B={(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}
B×A={(0, a), (0, b), (1, a), (1, b), (2, a), (2, b)}
  • 代码案例 :
    package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 16:51 */ object CartesianTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,10) 取不到 10 只能是 1 -> 9 val data01 = sc.parallelize(Array("a","b","c")) val data02 = sc.parallelize(Array(1,2,3,4))// 计算两个RDD 的迪卡尔积 val cartesianDataRDD = data01.cartesian(data02) println(cartesianDataRDD.collect().toList)sc.stop() } } List((a,1), (a,2), (a,3), (a,4), (b,1), (b,2), (b,3), (b,4), (c,1), (c,2), (c,3), (c,4))

pipe
管道,对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD
代码案例 :
Shell脚本 #!/bin/sh echo "AA" while read LINE; do echo ">>>"${LINE} done scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at :24scala> rdd.pipe("/home/bigdata/pipe.sh").collect() res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at :24scala> rdd.pipe("/home/bigdata/pipe.sh").collect() res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)

join
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
把相同的 key 的value 放到一块
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 21:08 */ object JoinTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,10) 取不到 10 只能是 1 -> 9val data01 = sc.parallelize(Array((1,"a"), (2,"b"), (3,"c"), (4,"d"))) val data02 = sc.parallelize(Array((1,"A"), (2,"B"), (3,"C"), (4,"D")))// 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD val joinDataRDD = data01.join(data02)println(joinDataRDD.collect().toList)sc.stop() } } // 结果 List((4,(d,D)), (1,(a,A)), (3,(c,C)), (2,(b,B)))

cogroup
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
它会把 同一个 RDD 中相同key的 value 放到同一个迭代器中
val data01 = sc.parallelize(Array((1,“a”), (2,“b”), (3,“c”), (3,“c”),(4,“d”)))
val data02 = sc.parallelize(Array((1,“A”), (12,“B”), (12, “B1”),(3,“C”), (4,“D”)))
List(
(4,(CompactBuffer(d),CompactBuffer(D))), (1,(CompactBuffer(a),CompactBuffer(A))), (3,(CompactBuffer(c, c),CompactBuffer?)), (12,(CompactBuffer(),CompactBuffer(B, B1))),
(2,(CompactBuffer(b),CompactBuffer())))
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 21:08 */ object CoGroupTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,10) 取不到 10 只能是 1 -> 9val data01 = sc.parallelize(Array((1,"a"), (2,"b"), (3,"c"), (3,"c"),(4,"d"))) val data02 = sc.parallelize(Array((1,"A"), (12,"B"), (12, "B1"),(3,"C"), (4,"D")))// 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD // 它会把 同一个 RDD 中相同key的 value 放大共同一个迭代器中 val coGroupDataRDD = data01.cogroup(data02)println(coGroupDataRDD.collect().toList)sc.stop() } } // 结果 List( (4,(CompactBuffer(d),CompactBuffer(D))), (1,(CompactBuffer(a),CompactBuffer(A))), (3,(CompactBuffer(c, c),CompactBuffer(C))), (12,(CompactBuffer(),CompactBuffer(B, B1))), (2,(CompactBuffer(b),CompactBuffer())))

reduceByKey
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
List((python,2), (hello,3), (java,3))
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 21:08 */ object ReduceByKeyTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,10) 取不到 10 只能是 1 -> 9val data = https://www.it610.com/article/sc.textFile("data/SparkCore/student.txt") .flatMap(x => {x.split(",")}) .map((_,1))//在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数 //将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。 // 注意 传入的两个参数都是 value val reduceByKeyDataRDD = data.reduceByKey((v1,v2) =>{ v1 + v2 })println(reduceByKeyDataRDD.collect().toList)sc.stop() } } // 结果 List((python,2), (hello,3), (java,3))

groupByKey
groupByKey也是对每个key进行操作,但只生成一个sequence。
分组的结果 一般都会到迭代器中 比如 cogroup
List((python,CompactBuffer(1, 1)),
(hello,CompactBuffer(1, 1, 1)), (java,CompactBuffer(1, 1, 1)))
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-27 21:08 */ object GroupByKeyTF { def main(args: Array[String]): Unit = {// 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") // 注意 range(1,10) 取不到 10 只能是 1 -> 9val data = https://www.it610.com/article/sc.textFile("data/SparkCore/student.txt") .flatMap(x => {x.split(",")}) .map((_,1))val groupByKeyDataRDD = data.groupByKey(2)println(groupByKeyDataRDD.collect().toList)sc.stop() } } // 结果 List((python,CompactBuffer(1, 1)), (hello,CompactBuffer(1, 1, 1)), (java,CompactBuffer(1, 1, 1)))

combineByKey ??????
( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
对相同K,把V合并成一个集合。
createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
那个键对应的累加器的初始值
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-28 9:37 */ object CombinerByKeyTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 读取数据 val data = https://www.it610.com/article/sc.textFile("data/SparkCore/student.txt", 3) .flatMap(_.split(",")) .map((_,2))val combineByKeyDataRDD = data.combineByKey( // data的数据类型是 ("java",1) 这种的 而传入的v是 1 (v:Int) => (v,1), // 这种写法类似 def (v:Int){} // mergeValues // v1._1 + v 是把所有 key相同的 value 都加一块 注意 : 不要把 value的值理解成 1 // v1._2 是统计 累加的所有 kv 的数量 (v1:(Int,Int),v:(Int)) => (v1._1 + v, v1._2 + 1), // mergerCombiners // v1._1 是 value的总数 , v1._2 是 可以的总数 (v1:(Int,Int), v2:(Int,Int)) => (v1._1 + v2._1, v1._2 + v2._2) ) // List((python,(1,2)), (hello,(1,3)), (java,(1,3))) println(combineByKeyDataRDD.collect().toList)// 计算平均值 val avg = combineByKeyDataRDD.map{ // 自动推断类型 case (key,value) => (key, value._1.toDouble / value._2) } // List((python,2.0), (hello,2.0), (java,2.0)) println(avg.collect().toList)sc.stop() } }

aggregateByKey
在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
seqOp函数用于在每一个分区中用初始值逐步迭代value,combOp函数用于合并每个分区中的结果。
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-28 9:37 */ object AggregateByKeyTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 读取数据 val data = https://www.it610.com/article/sc.textFile("data/SparkCore/student.txt", 3) .flatMap(_.split(",")) .map((_,1))// 和 combinerByKey 区别是 这个 已经有初始kv 比如 (key,0) // 而 combinerByKey 是当遇到第一个时创建 // 只要带 ByKey的 传入的值只能是 value !! // 函数 使用 {} 要求里面必须是代码块 , 如果是多个参数就不能 使用 {} 而使用 () val aggregateByKeyDataRDD = data.aggregateByKey((0,0)) ( (u:(Int,Int), v:Int) =>(u._1 + v , u._2 + 1), (u1:(Int,Int),u2:(Int,Int)) => (u1._1+u2._1, u1._2+ u2._2) ) // List((python,(2,2)), (hello,(3,3)), (java,(3,3))) println(aggregateByKeyDataRDD.collect().toList)// 计算平均值 // 括号里面是不能加代码块的 case 就是代码块 val avg = aggregateByKeyDataRDD.map { case (key: String, value: (Int, Int)) => { (key, value._1.toDouble / value._2) } } // List((python,1.0), (hello,1.0), (java,1.0)) println(avg.collect().toList) sc.stop() } } // 结果 List((python,(2,2)), (hello,(3,3)), (java,(3,3))) List((python,1.0), (hello,1.0), (java,1.0))

foldByKey
aggregateByKey的简化操作,seqop和combop相同
代码案例:
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-28 9:37 */ object FoldByKeyTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 读取数据 val data = https://www.it610.com/article/sc.textFile("data/SparkCore/student.txt", 3) .flatMap(_.split(",")) .map((_,1))// aggregateByKey的简化操作,seqop和combop相同 // 只要带 ByKey的 传入的值只能是 value !! // 函数 使用 {} 要求里面必须是代码块 , 如果是多个参数就不能 使用 {} 而使用 () // 注意 : zeroValue的类型只能是 value的类型 val foldByKeyDataRDD = data.foldByKey(0)( (u:Int,v:Int) => (u+v) )println(foldByKeyDataRDD.collect().toList)sc.stop() } }// 结果 List((python,2), (hello,3), (java,3))

sortBykey
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object SortByKeyTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")// 创建RDD 数据是 1 到 100 步长为 2 , 分区数是 4 val data = https://www.it610.com/article/sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))// 根据key排序 // 默认识 True 升序 val sortByKeyDataRDD = data.sortByKey( ascending = true) // List((1,dd), (2,bb), (3,aa), (6,cc)) println(sortByKeyDataRDD.collect().toList)sc.stop() } } // 结果 List((1,dd), (2,bb), (3,aa), (6,cc))

MapValues
针对于(K,V)形式的类型只对V进行操作
代码案例 :
package transformationimport org.apache.spark.{SparkConf, SparkContext}/** * @author 王天赐 * @create 2019-07-25 19:16 */ object MapValuesTF { def main(args: Array[String]): Unit = { // 创建 SparkConf 和 SparkContext 并设置本地运行模式 val conf = new SparkConf() .setAppName("MapTF") .setMaster("local[1]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR")val data = https://www.it610.com/article/sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))//针对于(K,V)形式的类型只对V进行操作 val mapValuesDataRDD = data.mapValues(v => (v+"1")) // List((1,a1), (1,d1), (2,b1), (3,c1)) println(mapValuesDataRDD.collect().toList) sc.stop() } } // 结果 List((1,a1), (1,d1), (2,b1), (3,c1))

    推荐阅读