Spark - 宽依赖和窄依赖

前面Standalone模式下的Master对资源的调度,是第一层调度。为了后面第二层调度讲解,这里先用一个例子作为铺垫,也顺便讲一下宽依赖和窄依赖。所涉及的RDD使用之前已经讲过了,这里不做赘述。
join 这里有两个RDD,分区都是2,通过join转换为rdd3,为了更直观的看出每个数据在哪个分区,下面都通过p+数字的形式打印出来。

val sc = new SparkContext(new SparkConf().setAppName("MixRDD").setMaster("local")) sc.setLogLevel("ERROR") val rdd1: RDD[(String, Int)] = sc.parallelize(List(("a", 1), ("b", 1), ("c", 1), ("d", 1)), 2) val rdd2: RDD[(String, Int)] = sc.parallelize(List(("a", 2), ("d", 2), ("e", 2), ("f", 2)), 2) val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2) // rdd1:ArrayBuffer((a,1), (b,1), (c,1), (d,1)) println("rdd1:" + rdd1.collect().toBuffer) // rdd2:ArrayBuffer((a,2), (d,2), (e,2), (f,2)) println("rdd2:" + rdd2.collect().toBuffer) // rdd3:ArrayBuffer((d,(1,2)), (a,(1,2))) println("rdd3:" + rdd3.collect().toBuffer) val p1: RDD[String] = rdd1.mapPartitionsWithIndex(f1) val p2: RDD[String] = rdd2.mapPartitionsWithIndex(f1) val p3: RDD[String] = rdd3.mapPartitionsWithIndex(f2) // p1:ArrayBuffer(a:0, b:0, c:1, d:1) println("p1:" + p1.collect().toBuffer) // p2:ArrayBuffer(a:0, d:0, e:1, f:1) println("p2:" + p2.collect().toBuffer) // p3:ArrayBuffer(d:0, a:1) println("p3:" + p3.collect().toBuffer)

上面的代码的流程图如下,可以看到RDD1的分区0连着RDD3的分区1,RDD1的分区1连着RDD2的分区0(图例没有分区数字,从上到下递增,0开始),RDD2同理。
RDD1的每个分区都只有一个“儿子”,也就是说RDD1的每个分区仅被一个RDD3的一个分区依赖,这样我们可以称为窄依赖。
Spark - 宽依赖和窄依赖
文章图片

map 这里有一个RDD4,通过map得到RDD5,比如RDD4的一个元素"a",变成("a",4)。
val rdd4: RDD[String] = sc.parallelize(List("a", "b", "c", "d"), 2) val rdd5: RDD[(String, Int)] = rdd4.map((_, 4)) // rdd4:ArrayBuffer(a, b, c, d) println("rdd4:" + rdd4.collect().toBuffer) // rdd5:ArrayBuffer((a,3), (b,3), (c,3), (d,3)) println("rdd5:" + rdd5.collect().toBuffer) val p4: RDD[String] = rdd4.mapPartitionsWithIndex(f3) val p5: RDD[String] = rdd5.mapPartitionsWithIndex(f1) // p4:ArrayBuffer(a:0, b:0, c:1, d:1) println("p4:" + p4.collect().toBuffer) // p5:ArrayBuffer(a:0, b:0, c:1, d:1) println("p5:" + p5.collect().toBuffer)

上面的代码的流程图如下,可以看到RDD4的分区0连着RDD5的分区0,RDD4的分区1连着RDD5的分区1,由于RDD4的每个分区都只有一个“儿子”,也就是说RDD4的每个分区仅被一个RDD5的一个分区依赖,这样我们也可以称为窄依赖。
Spark - 宽依赖和窄依赖
文章图片

union 这里有一个RDD6,通过union上面得到的RDD5,得到一个新的RDD7。
val rdd6: RDD[(String, Int)] = sc.parallelize(List(("a", 6), ("d", 6), ("e", 6), ("f", 6)), 2) val rdd7: RDD[(String, Int)] = rdd5.union(rdd6) // rdd6:ArrayBuffer((a,6), (d,6), (e,6), (f,6)) println("rdd6:" + rdd6.collect().toBuffer) // rdd7:ArrayBuffer((a,4), (b,4), (c,4), (d,4), (a,6), (d,6), (e,6), (f,6)) println("rdd7:" + rdd7.collect().toBuffer) val p6: RDD[String] = rdd6.mapPartitionsWithIndex(f1) val p7: RDD[String] = rdd7.mapPartitionsWithIndex(f1) // p6:ArrayBuffer(a:0, d:0, e:1, f:1) println("p6:" + p6.collect().toBuffer) // p7:ArrayBuffer(a:0, b:0, c:1, d:1, a:2, d:2, e:3, f:3) println("p7:" + p7.collect().toBuffer)

【Spark - 宽依赖和窄依赖】上面的代码的流程图如下,可以看到RDD5的分区0连着RDD7的分区0,RDD5的分区1连着RDD7的分区1,由于RDD5的每个分区都只有一个“儿子”,也就是说RDD5的每个分区仅被一个RDD7的一个分区依赖,RDD6同理,这样我们也可以称为窄依赖。
Spark - 宽依赖和窄依赖
文章图片

所以,只要有一个“儿子”,或者仅被一个分区依赖我们称为窄依赖。
fullOuterJoin 通过fullOuterJoin把join的结果和union的结果连接起来,并且分区数设置为4。
val rdd8: RDD[(String, (Option[Int], Option[(Int, Int)]))] = rdd7.fullOuterJoin(rdd3, 4) // rdd8:ArrayBuffer((d,(Some(4),Some((1,2)))), (d,(Some(6),Some((1,2)))), (e,(Some(6),None)), (a,(Some(4),Some((1,2)))), (a,(Some(6),Some((1,2)))), (b,(Some(4),None)), (f,(Some(6),None)), (c,(Some(4),None))) println("rdd8:" + rdd8.collect().toBuffer) val p8: RDD[String] = rdd8.mapPartitionsWithIndex(f4) // p8:ArrayBuffer(d:0, d:0, e:1, a:1, a:1, b:2, f:2, c:3) println("p8:" + p8.collect().toBuffer)

上面的代码的流程图如下,可以看到RDD7的分区0连着RDD8的分区1,RDD7的分区0还连着RDD8的分区2。RDD7的分区1连着RDD8的分区0,RDD7的分区1还连着RDD8的分区3。RDD7的分区2连着RDD8的分区0,RDD7的分区1还连着RDD8的分区1。
所以RDD7的分区0、分区1、分区2都不止一个“儿子”,也就是说RDD7的分区0、分区1、分区2被RDD8的多个分区依赖,所以这种叫做宽依赖。
Spark - 宽依赖和窄依赖
文章图片

其他代码
val f1: (Int, Iterator[(String, Int)]) => Iterator[String] = (x, y) => { (y.toList.map(z => z._1 + ":" + x)).iterator }val f2: (Int, Iterator[(String, (Int, Int))]) => Iterator[String] = (x, y) => { (y.toList.map(z => z._1 + ":" + x)).iterator }val f3: (Int, Iterator[String]) => Iterator[String] = (x, y) => { (y.toList.map(z => z + ":" + x)).iterator }val f4: (Int, Iterator[(String, (Option[Int], Option[(Int, Int)]))]) => Iterator[String] = (x, y) => { (y.toList.map(z => z._1 + ":" + x)).iterator }

    推荐阅读