Spark详解(十四)(Spark SQL的Join实现)

1. 简介 Join是SQL语句中的常用操作,良好的表结构能够将数据分散在不同的表中,使其符合某种范式,减少表冗余、更新容错等。而建立表和表之间关系的最佳方式就是Join操作。Join连接是大数据处理的重要手段,它基于表之间的共同字段将来自两个或多个表的行结合起来。如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的。
2. Join的基本要素 如下图所示,Join大致包括三个要素:Join方式、Join条件以及过滤条件。其中过滤条件也可以通过AND语句放在Join条件中。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

Spark支持所有类型的Join,包括:

  • inner join
  • left outer join
  • right outer join
  • full outer join
  • left semi join
  • left anti join
    下面分别阐述这几种Join的实现。
2. Join基本实现流程 总体上来说,Join的基本实现流程如下图所示,Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

在实际计算时,spark会基于streamIter来遍历,每次取出streamIter中的一条记录rowA,根据Join条件计算keyA,然后根据该keyA去buildIter中查找所有满足Join条件(keyB==keyA)的记录rowBs,并将rowBs中每条记录分别与rowAjoin得到join后的记录,最后根据过滤条件得到最终join的记录。
从上述计算过程中不难发现,对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。
2.1 Hash join实现 spark提供了hash join实现方式,在shuffle read阶段不对记录排序,反正来自两格表的具有相同key的记录会在同一个分区,只是在分区内不排序,将来自buildIter的记录放到hash表中,以便查找,如下图所示。
由于Spark是一个分布式的计算引擎,可以通过分区的形式将大批量的数据划分成n份较小的数据集进行并行计算。这种思想应用到Join上便是Shuffle Hash Join了。利用key相同必然分区相同的这个原理,SparkSQL将较大表的join分而治之,先将表划分成n个分区,在对buildlter查找表和streamlter表进行Hash Join。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

Shuffle Hash Join分为两步:
  1. 对两张表分别按照join keys进行重分区,即shuffle,目的是为了让有相同join keys值的记录分到对应的分区中
  2. 对对应分区中的数据进行join,此处先将小表分区构造为一张hash表,然后根据大表分区中记录的join keys值拿出来进行匹配
不难发现,要将来自buildIter的记录放到hash表中,那么每个分区来自buildIter的记录不能太大,否则就存不下,默认情况下hash join的实现是关闭状态,如果要使用hash join,必须满足以下四个条件:
  • buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
  • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
  • 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中
  • streamIter的大小是buildIter三倍以上
2.2 Sort Merge Join 实现 上面介绍的实现对于一定大小的表比较适用,但当两个表都非常大时,显然无论适用哪种都会对计算内存造成很大压力。这是因为join时两者采取的都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join keys值相等的记录进行连接。
要让两条记录能join到一起,首先需要将具有相同key的记录在同一个分区,所以通常来说,需要做一次shuffle,map阶段根据join条件确定每条记录的key,基于该key做shuffle write,将可能join到一起的记录分到同一个分区中,这样在shuffle read阶段就可以将两个表中具有相同key的记录拉到同一个分区处理。前面我们也提到,对于buildIter一定要是查找性能较优的数据结构,通常我们能想到hash表,但是对于一张较大的表来说,不可能将所有记录全部放到hash表中,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种实现方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据排序,如下图所示:
Spark详解(十四)(Spark SQL的Join实现)
文章图片

在shuffle read阶段,分别对streamIter和buildIter进行merge sort,在遍历streamIter时,对于每条记录,都采用顺序查找的方式从buildIter查找对应的记录,由于两个表都是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需从buildIter中上一次查找结束的位置开始查找,所以说每次在buildIter中查找不必重头开始,整体上来说,查找性能还是较优的。
2.3 Broadcast Join实现 为了能具有相同key的记录分到同一个分区,我们通常是做shuffle,而shuffle在Spark中是比较耗时的操作,我们应该尽可能的设计Spark应用使其避免大量的shuffle。。那么如果buildIter是一个非常小的表,那么其实就没有必要大动干戈做shuffle了,直接将buildIter广播到每个计算节点,然后将buildIter放到hash表中,如下图所示。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

在执行上,主要可以分为以下两步:
  1. broadcast阶段:将小表广播分发到大表所在的所有主机。分发方式可以有driver分发,或者采用p2p方式。
  2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;
Broadcast Join的条件有以下几个:
  1. 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M (或者加了broadcast join的hint)
  2. 基表不能被广播,比如left outer join时,只能广播右表
3. Spark 支持的Join类型 3.1 inner join inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrmae时,可以不用关心哪个是左表,哪个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。其基本实现流程如下图所示,在查找阶段,如果右表不存在满足join条件的记录,则跳过。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

3.2 left outer join left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。我们在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。其基本实现流程如下图所示。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

3.3 right outer join right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。所以说,右表是streamIter,左表是buildIter,我们在写sql语句或者使用DataFrmae时,一般让大表在右边,小表在左边。其基本实现流程如下图所示。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

3.4 full outer join full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,但是又不能简单地先left outer join,再right outer join,最后union得到最终结果,因为这样最终结果中就存在两份inner join的结果了。因为既然完成left outer join又要完成right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter,其基本实现流程如下图所示。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

由于左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,如果key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录;如果keyA<keyB,则说明右表中没有与左表rowA对应的记录,那么joinrowA与nullRow,紧接着,rowA更新到左表的下一条记录;如果keyA>keyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB,紧接着,rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录全部处理完。
3.5 left semi join left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回null,其基本实现流程如下图所示。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

3.6 left anti join left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录,其基本实现流程如下图所示。
Spark详解(十四)(Spark SQL的Join实现)
文章图片

4. 总结 【Spark详解(十四)(Spark SQL的Join实现)】Join是数据库查询中一个非常重要的语法特性,在数据库领域可以说是“得join者的天下”,SparkSQL作为一种分布式数据仓库系统,给我们提供了全面的join支持,并在内部实现上无声无息地做了很多优化,了解join的实现将有助于我们更深刻的了解我们的应用程序的运行轨迹。

    推荐阅读