SparkSQL之broadcast join
当数据集的大小小于spark.sql.autoBroadcastJoinThreshold 所设置的阈值的时候, SPARK SQL
使用广播join来代替hash join
来优化join查询。广播join可以非常有效地用于具有相对较小的表和大型表之间的连接,然后可用于执行星型连接。
它可以避免通过网络发送大表的所有数据
- 你可以使用广播函数或者SQL广播提示来标记一个要广播的数据集当我们使用时join 查询的时候。
- 注意:broadcast join 也称为replicated join 或者 map-side join。
val threshold =spark.conf.get("spark.sql.autoBroadcastJoinThreshold").toInt
scala> threshold / 1024 / 1024
res0: Int = 10val q = spark.range(100).as("a").join(spark.range(100).as("b")).where($"a.id" === $"b.id")
scala> println(q.queryExecution.logical.numberedTreeString)
00 'Filter ('a.id = 'b.id)
01 +- Join Inner
02:- SubqueryAlias a
03:+- Range (0, 100, step=1, splits=Some(8))
04+- SubqueryAlias b
05+- Range (0, 100, step=1, splits=Some(8))scala> println(q.queryExecution.sparkPlan.numberedTreeString)
00 BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight
01 :- Range (0, 100, step=1, splits=8)
02 +- Range (0, 100, step=1, splits=8)scala> q.explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=8)spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res1: String = -1scala> q.explain
== Physical Plan ==
*SortMergeJoin [id#0L], [id#4L], Inner
:- *Sort [id#0L ASC NULLS FIRST], false, 0
:+- Exchange hashpartitioning(id#0L, 200)
:+- *Range (0, 100, step=1, splits=8)
+- *Sort [id#4L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200)// Force BroadcastHashJoin with broadcast hint (as function)
val qBroadcast = spark.range(100).as("a").join(broadcast(spark.range(100)).as("b")).where($"a.id" === $"b.id")
scala> qBroadcast.explain
== Physical Plan ==
*BroadcastHashJoin [id#14L], [id#18L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=8)// Force BroadcastHashJoin using SQL's BROADCAST hint
// Supported hints: BROADCAST, BROADCASTJOIN or MAPJOIN
val qBroadcastLeft = """
SELECT /*+ BROADCAST (lf) */ *
FROM range(100) lf, range(1000) rt
WHERE lf.id = rt.id
"""
scala> sql(qBroadcastLeft).explain
== Physical Plan ==
*BroadcastHashJoin [id#34L], [id#35L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1000, step=1, splits=8)val qBroadcastRight = """
SELECT /*+ MAPJOIN (rt) */ *
FROM range(100) lf, range(1000) rt
WHERE lf.id = rt.id
"""
scala> sql(qBroadcastRight).explain
== Physical Plan ==
*BroadcastHashJoin [id#42L], [id#43L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1000, step=1, splits=8)
【SparkSQL之broadcast join】来源:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html
推荐阅读
- PMSJ寻平面设计师之现代(Hyundai)
- 太平之莲
- 闲杂“细雨”
- 七年之痒之后
- 深入理解Go之generate
- 由浅入深理解AOP
- 期刊|期刊 | 国内核心期刊之(北大核心)
- 生活随笔|好天气下的意外之喜
- 感恩之旅第75天
- python学习之|python学习之 实现QQ自动发送消息