【Spark mapPartitions 及mapPartitionsWithIndex算子】知识养成了思想,思想同时又在融化知识。这篇文章主要讲述Spark mapPartitions 及mapPartitionsWithIndex算子相关的知识,希望能为你提供帮助。
mapPartitions
与map类似,map函数是应用到每个元素,而mapPartitions的输入函数是每个分区的数据,把每个分区中的内容作为整体来处理的。
当map里面有比较耗时的初始化操作时,比如连接db,可以采用mapPartitions,它对每个partition操作一次,其函数的输入与输出都是iterator类型。
实例如下:
scala>
val rdd1=sc.parallelize(1 to 9,3)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <
console>
:24
scala>
def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={
| var res=List[(T,T)]()
| var pre=iter.next
| while (iter.hasNext) {
| val cur=iter.next
| res.::=(pre,cur)
| pre=cur
| }
| res.iterator
| }myfunc: [T](iter: Iterator[T])Iterator[(T, T)]scala>
rdd1.mapPartitions(myfunc)res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitions at <
console>
:28scala>
res2.collect()res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
mapPartitionsWithIndex与
mapPartitions 类似,参数需多传一个分区的index.
实例如下:
scala>
val mapReslut=rdd1.mapPartitionsWithIndex{
| (index,iterator)=>
{
| val list=iterator.toList
| list.map(x=>
x +"->
"+index).iterator
| }
| }mapReslut: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at mapPartitionsWithIndex at <
console>
:25scala>
mapReslut.collectres6: Array[String] = Array(1->
0, 2->
0, 3->
0, 4->
1, 5->
1, 6->
1, 7->
2, 8->
2, 9->
2)
推荐阅读
- mybatis mapper xml????????????????????????????????????
- jQuery文档操作--append()prepend()after()和before()
- 1027代码审计平台 4-安卓项目
- Android 滑块验证
- ASP.NET MVC项目中App_Code目录在程序应用
- Ubuntu编译Android源码步骤
- H5页面关于android软键盘弹出顶起底部元素的解决方案
- 安卓 dex 通用脱壳技术研究
- Android Studio NDK开发环境搭建