使用Coalesce和Repartition管理Spark分区

Spark将数据拆分为分区并并行执行分区上的计算。您应该了解数据的分区方式以及何时需要手动调整分区以使Spark计算有效运行。
分区介绍 创建一个数值型的DataFrame来说明数据是如何分区的

val x = (1 to 10).toList val numbersDf = x.toDF(“number”)

实验机器上,这个numbersDf被分为2个分区
scala> numbersDf.rdd.partitions.size res0: Int = 2

将DataFrame写入磁盘时,每个分区都是一个单独的CSV文件。
numbersDf.write.csv("/usr/local/spark_output/numbers")

结果如下:

使用Coalesce和Repartition管理Spark分区
文章图片
图片.png
coalesce coalesce方法减少了DataFrame中的分区数量。以下是如何合并两个分区中的数据:
val numbersDf2 = numbersDf.coalesce(1)

我们可以验证coalesce是否只创建了一个只有一个分区的新DataFrame:
scala> numbersDf2.rdd.partitions.size res2: Int = 1

numbersDf2将作为一个文本文件写入磁盘:
numbersDf2.write.csv("/usr/local/spark_output/numbers")

结果如下:

使用Coalesce和Repartition管理Spark分区
文章图片
图片.png
合并算法将数据从分区B移动到分区A,并将数据从分区D移动到分区C.分区A和分区C中的数据不随合并算法一起移动。该算法在某些情况下很快,因为它最小化了数据移动。
增加分区 我们尝试使用coalesce来增加分区,但是并不生效:
val numbersDf3 = numbersDf.coalesce(4)scala> numbersDf3.rdd.partitions.size res6: Int = 2

合并算法通过将数据从某些分区移动到现有分区来更改节点数。该算法显然不能增加分区数量。
repartition repartition方法可用于增加或减少DataFrame中的分区数。
让我们用numbersDf创建一个带有两个分区的homerDf。
scala> val homerDf = numbersDf.repartition(1) scala> homerDf.rdd.partitions.size res7: Int = 1

让我们检查homerDf中每个分区的数据:
scala> homerDf.write.csv("/usr/local/spark_output/numbers")

使用Coalesce和Repartition管理Spark分区
文章图片
图片.png 重分区算法执行完整数据混洗,并在分区之间平均分配数据。它不会尝试像合并算法那样最小化数据移动。
增加分区 重分区方法也可用于增加分区数
scala> val bartDf = numbersDf.repartition(6)scala> bartDf.rdd.partitions.size res11: Int = 6

以下是如何在bartDf中的分区之间拆分数据

使用Coalesce和Repartition管理Spark分区
文章图片
图片.png 重新分区方法可以完全重排数据,因此可以增加分区数。
coalesce和repartition之间的区别 重新分区算法对数据进行完全重排,并创建相同大小的数据分区。coalesce结合现有分区以避免完全洗牌。
按列repartition 让我们使用以下数据来检查特定列如何重新对DataFrame进行分区。先创建DataFrame
val people = List( (10,"blue"), (13,"red"), (15,"blue"), (99,"red"), (67,"blue") )val peopleDf = people.toDF("age","color")

让我们通过color列重新分区DataFrame:
val colorDf = peopleDf.repartition($"color")

按列分区时,Spark默认会创建至少200个分区。查看分区数据,只有两个分区有数据,且同一个分区中的数据的color字段是一致的。
scala> colorDf.rdd.partitions.size res15: Int = 200

colorDf包含每种color的不同分区,并针对color提取进行了优化。按列分区类似于索引关系数据库中的列。
考虑分区 【使用Coalesce和Repartition管理Spark分区】DataFrames的分区似乎是一个底层实现细节,应该由框架来管理,但事实并非如此。在将大数据解析为小数据时,几乎总是应该重新分区数据。
你可能会经常把大数据通过过滤成小数据,所以习惯重新分区。

    推荐阅读