数据仓库|Hive、SparkSQL是如何决定写文件的数量的()

Hive自身和Spark都提供了对Hive的SQL支持,用SQL的交互方式操作Hive底层的HDFS文件,两种方式在写文件的时候有一些区别:
1. Hive 1.1 without shuffle
Hive在通过SQL写文件是通过MapReduce任务完成的,如下面这个例子:

hive> insert into table temp.czc_hive_test_write values ('col1_value', 1),('col1_value', 2);

在表中插入数据后,可以hdfs对应路径下找到存储的文件
$ hadoop fs -ls /user/hive/warehouse/temp.db/czc_hive_test_write/part_date=2018-12-12 Found 2 items -rwxrwxrwx3 hadoop supergroup26 2019-12-20 15:56 hdfs://sdg/user/hive/warehouse/temp.db/czc_hive_test_write/part_date=2018-12-12/000000_0

可以看到插入生成了1个文件,这是因为每一条插入语句都会单独启动一个MapReduce任务,一个MapReduce任务对应一个结果文件。
1.2 with shuffle
当插入过程有shuffle时:
hive> insert into table temp.czc_hive_game select count(*), game_id from temp.source_table group by game_id; ... Hadoop job information for Stage-1: number of mappers: 62; number of reducers: 1 ...

由Hive实现group by的过程可知,group by的时候会以group by的字段为key进行shuffle,即上例中的game_id字段。从执行日志中可以看到整个任务启用了62个mapper和1个reducer,由于最终写数据的过程是在reducer中完成,所以最终写数据的文件数量也应该只有1个。
$ hadoop fs -ls/user/hive/warehouse/temp.db/czc_hive_game Found 1 items -rwxrwxrwx3 hadoop supergroup268 2019-12-20 16:31 /user/hive/warehouse/temp.db/czc_hive_game/000000_0

注:Hive控制reducer数量的规则如下:
Hive自己如何确定reduce数:
reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1G)
hive.exec.reducers.max
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
Spark SQL 2.1 without shuffle
Spark SQL也可以在hive中操作文件,执行命令
spark.sql("insert into table temp.czc_spark_test_write values ('col1_value', 1),('col1_value', 2)")

Hdfs中文件的存储如下:
$ hadoop fs -ls/user/hive/warehouse/temp.db/czc_spark_test_write Found 2 items -rwxrwxrwx3 hadoop supergroup13 2019-12-20 17:01 /user/hive/warehouse/temp.db/czc_spark_test_write/part-00000-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000 -rwxrwxrwx3 hadoop supergroup13 2019-12-20 17:01 /user/hive/warehouse/temp.db/czc_spark_test_write/part-00001-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000$ hadoop fs -cat /user/hive/warehouse/temp.db/czc_spark_test_write/part-00000-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000 col1_value1$ hadoop fs -cat /user/hive/warehouse/temp.db/czc_spark_test_write/part-00001-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000 col1_value2

可以发现即使是同一条语句,spark也会启动两个任务区并行的写文件,最终产生了两个文件结果。
2.2 with shuffle
Spark中同样以类似的SQL为例:
scala>spark.sql("insert into table temp.czc_spark_game select count(*), game_id from temp.source_tablegroup by game_id"); res1: org.apache.spark.sql.DataFrame = []

与Hive不同的是,Spark在执行shuffle过程的时候,会为每一个shuffle的key启动一个任务来写数据,上例中的key game_id在源数据source_table的分布情况是共有26个不同的key。
hive> select count(distinct game_id) from temp.source_table; OK 26

因此spark会启动26个任务来写数据,在最终的结果文件中也应该有26个文件:
$ hadoop fs -ls hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game Found 26 items -rwxrwxrwx3 hadoop supergroup0 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00000-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00007-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup7 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00010-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00011-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00012-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00032-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup14 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00036-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00043-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00048-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup24 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00065-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00066-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup16 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00083-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00086-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup16 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00101-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00102-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00105-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup14 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00111-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup12 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00123-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00124-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00136-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00162-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00163-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup10 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00165-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00174-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup17 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00176-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000 -rwxrwxrwx3 hadoop supergroup9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00199-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000

2.3 解决小文件问题
由于spark的写文件方式,会导致产生很多小文件,会对NameNode造成压力,读写性能变差,为了解决这种小文件问题,spark新的版本(笔者使用2.4.0.cloudera2版本)中支持了动态规划shuffle过程,需要配置spark.sql.adaptive.enabled属性。
scala> spark.sql("set spark.sql.adaptive.enabled=true") scala> spark.sql("insert into table temp.czc_spark_game select count(*), game_id from temp.source_table group by game_id")

在将spark.sql.adaptive.enabled属性设置为true后,spark写文件的结果为
$ hadoop fs -ls hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game Found 1 items -rwxr-xr-x3 hadoop supergroup268 2019-12-20 20:55 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00000-a293e3b3-3136-4f57-bf66-f0ee2d4f8dbb-c000

【数据仓库|Hive、SparkSQL是如何决定写文件的数量的()】从结果可以看到只有一个文件,这是由于动态规划的作用,在写文件的时候只启动了一个任务。动态规划的细节请参考Adaptive Execution 让 Spark SQL 更高效更智能。

    推荐阅读