SparkSQL之broadcast join

当数据集的大小小于spark.sql.autoBroadcastJoinThreshold 所设置的阈值的时候, SPARK SQL
使用广播join来代替hash join
来优化join查询。广播join可以非常有效地用于具有相对较小的表和大型表之间的连接,然后可用于执行星型连接。
它可以避免通过网络发送大表的所有数据
  • 你可以使用广播函数或者SQL广播提示来标记一个要广播的数据集当我们使用时join 查询的时候。
  • 注意:broadcast join 也称为replicated join 或者 map-side join。
以下是一些惯常用法:
val threshold =spark.conf.get("spark.sql.autoBroadcastJoinThreshold").toInt scala> threshold / 1024 / 1024 res0: Int = 10val q = spark.range(100).as("a").join(spark.range(100).as("b")).where($"a.id" === $"b.id") scala> println(q.queryExecution.logical.numberedTreeString) 00 'Filter ('a.id = 'b.id) 01 +- Join Inner 02:- SubqueryAlias a 03:+- Range (0, 100, step=1, splits=Some(8)) 04+- SubqueryAlias b 05+- Range (0, 100, step=1, splits=Some(8))scala> println(q.queryExecution.sparkPlan.numberedTreeString) 00 BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight 01 :- Range (0, 100, step=1, splits=8) 02 +- Range (0, 100, step=1, splits=8)scala> q.explain == Physical Plan == *BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight :- *Range (0, 100, step=1, splits=8) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=8)spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold") res1: String = -1scala> q.explain == Physical Plan == *SortMergeJoin [id#0L], [id#4L], Inner :- *Sort [id#0L ASC NULLS FIRST], false, 0 :+- Exchange hashpartitioning(id#0L, 200) :+- *Range (0, 100, step=1, splits=8) +- *Sort [id#4L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200)// Force BroadcastHashJoin with broadcast hint (as function) val qBroadcast = spark.range(100).as("a").join(broadcast(spark.range(100)).as("b")).where($"a.id" === $"b.id") scala> qBroadcast.explain == Physical Plan == *BroadcastHashJoin [id#14L], [id#18L], Inner, BuildRight :- *Range (0, 100, step=1, splits=8) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=8)// Force BroadcastHashJoin using SQL's BROADCAST hint // Supported hints: BROADCAST, BROADCASTJOIN or MAPJOIN val qBroadcastLeft = """ SELECT /*+ BROADCAST (lf) */ * FROM range(100) lf, range(1000) rt WHERE lf.id = rt.id """ scala> sql(qBroadcastLeft).explain == Physical Plan == *BroadcastHashJoin [id#34L], [id#35L], Inner, BuildRight :- *Range (0, 100, step=1, splits=8) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 1000, step=1, splits=8)val qBroadcastRight = """ SELECT /*+ MAPJOIN (rt) */ * FROM range(100) lf, range(1000) rt WHERE lf.id = rt.id """ scala> sql(qBroadcastRight).explain == Physical Plan == *BroadcastHashJoin [id#42L], [id#43L], Inner, BuildRight :- *Range (0, 100, step=1, splits=8) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 1000, step=1, splits=8)

【SparkSQL之broadcast join】来源:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html

    推荐阅读