Spark mapPartitions 及mapPartitionsWithIndex算子

【Spark mapPartitions 及mapPartitionsWithIndex算子】知识养成了思想,思想同时又在融化知识。这篇文章主要讲述Spark mapPartitions 及mapPartitionsWithIndex算子相关的知识,希望能为你提供帮助。
mapPartitions   与map类似,map函数是应用到每个元素,而mapPartitions的输入函数是每个分区的数据,把每个分区中的内容作为整体来处理的。  当map里面有比较耗时的初始化操作时,比如连接db,可以采用mapPartitions,它对每个partition操作一次,其函数的输入与输出都是iterator类型。
  实例如下:
scala> val rdd1=sc.parallelize(1 to 9,3)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at < console> :24
scala> def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={      | var res=List[(T,T)]()      | var pre=iter.next      | while (iter.hasNext) {      | val cur=iter.next      | res.::=(pre,cur)      | pre=cur      | }      | res.iterator      | }myfunc: [T](iter: Iterator[T])Iterator[(T, T)]scala> rdd1.mapPartitions(myfunc)res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitions at < console> :28scala> res2.collect()res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))   
mapPartitionsWithIndex与  mapPartitions 类似,参数需多传一个分区的index.
实例如下: 
scala> val mapReslut=rdd1.mapPartitionsWithIndex{      | (index,iterator)=> {      | val list=iterator.toList      | list.map(x=> x +"-> "+index).iterator      | }      | }mapReslut: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at mapPartitionsWithIndex at < console> :25scala> mapReslut.collectres6: Array[String] = Array(1-> 0, 2-> 0, 3-> 0, 4-> 1, 5-> 1, 6-> 1, 7-> 2, 8-> 2, 9-> 2) 

    推荐阅读