- 背景
- Hive 实现缓慢变化维,没有使用事务表的更新和删除操作(最新版本Hive已经支持,但需要Server/Client做相应配置,Hive实现的事务还有一定的局限性)
- Hive 自身的SQL使用MapReduce引擎,速度慢,这里使用SparkSQL实现
- 自动化的SCD引擎待后续实现
- 参考:
https://cwiki.apache.org/confluence/display/Hive/
源码: https://github.com/HanseyLee/AutoSparkSQLSCD
https://juejin.im/post/5bdf0b84e51d450eca7c6c8b
- 准备基础维度表 base_dim
- 业务字段:id, name, city, st.
- 维度表默认字段:sk(surrogate key 代理键,全局唯一的整数值), scd_update_date, scd_version, scd_valid_flag, scd_start_date, scd_end_date
- scd1字段 city, st; scd2字段 name.
- scd1 字段变动不记录scd_version, 但会记录变更时间scd_update_date
import spark.sql
import spark.implicits._
// scd1: city, st;
scd2: name
sql("drop table if exists base_dim")
sql(
s"""
| create table base_dim
|(sk int, id int, name string, city string, st string, scd_update_date string,scd_version int,scd_valid_flag string,scd_start_date string, scd_end_date string)
| stored as orc
""".stripMargin)
sql(
s"""
| insert into table base_dim
| values
|(1,1,"zhangsan","us","ca","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
|(2,2,"lisi","us","cb","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
|(3,3,"wangwu","ca","bb","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
|(4,4,"zhaoliu","ca","bc","2019-01-01",1,"Y","2019-01-01","9999-12-31"),
|(5,5,"mazi","aa","aa","2019-01-01",1,"Y","2019-01-01","9999-12-31")
""".stripMargin)
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
|3|3|wangwu|ca| bb|2019-01-01|1|Y|2019-01-01|9999-12-31|
|4|4| zhaoliu|ca| bc|2019-01-01|1|Y|2019-01-01|9999-12-31|
|5|5|mazi|aa| aa|2019-01-01|1|Y|2019-01-01|9999-12-31|
|1|1|zhangsan|us| ca|2019-01-01|1|Y|2019-01-01|9999-12-31|
|2|2|lisi|us| cb|2019-01-01|1|Y|2019-01-01|9999-12-31|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
- 阶段表 stage
- 数据每行对应的变化:1: scd2 + scd1; 2: expired(snapshot) | unchanged (incrementing mode); 3: unchanged; 4: scd1; 5: scd2; 6: new record
- 快照模式(snapshot):不包含在该表中的记录视为已过期的。
- 增量变动模式(incrementing mode):不包含在该表中的记录仍然视为有效。
- 除了1中所述业务字段外,还有表征每行记录的时间字段
sql("drop table if exists stage")
sql("create table stage (id int, name string, city string, st string, update_date string) stored as orc")
// 1: scd2 + scd1;
2: expired(snapshot) | unchanged (incrementing mode);
3: unchanged;
4: scd1;
5: scd2;
6: new record
val dt = getDate()
sql(
s"""
| insert into table stage
| values
| (1,"zhang","u","c",'$dt'),
|(3,"wangwu","ca","bb",'$dt'),
| (4,"zhaoliu","ac","cb",'$dt'),
|(5,"ma","aa","aa",'$dt'),
|(6,"laoyang","dd","dd",'$dt')
""".stripMargin)
+---+-------+----+---+-----------+
| id|name|city| st|update_date|
+---+-------+----+---+-----------+
|1|zhang|u|c| 2019-08-06|
|3| wangwu|ca| bb| 2019-08-06|
|4|zhaoliu|ac| cb| 2019-08-06|
|5|ma|aa| aa| 2019-08-06|
|6|laoyang|dd| dd| 2019-08-06|
+---+-------+----+---+-----------+
- 获取不变的记录 (主表:基础维度表)
- 快照模式: 1.在快照中且规定的特定字段均不变(业务主键+SCD2+SCD1)的记录
val unchangedDF = sql(
s"""
|SELECT
// 如果使用阶段表中的数据,譬如想使用最新的update_date
//|bd.sk,
//|s.*, // 如果阶段表的字段顺序和基础维度表不一致,需要逐个字段表示
//|bd.scd_version,
//|bd.scd_valid_flag,
//|bd.scd_start_date,
//|bd.scd_end_date
|bd.*
|FROM
|base_dim bd
|JOIN stage s
|ON bd.id=s.id AND bd.name=s.name AND bd.city=s.city AND bd.st=s.st
""".stripMargin)
+---+---+------+----+---+-----------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+------+----+---+-----------+-----------+--------------+--------------+------------+
|3|3|wangwu|ca| bb| 2019-01-01|1|Y|2019-01-01|9999-12-31|
+---+---+------+----+---+-----------+-----------+--------------+--------------+------------+ 2) 增量变动模式: 1. 在阶段表中且规定的特定字段均不变(业务主键+SCD2+SCD1)的记录;2. 不在阶段表中的记录
val unchangedDF = sql(
s"""
|SELECT
|bd.*
|FROM
|base_dim bd
|LEFT JOIN stage s
|ON bd.id=s.id
|WHEREs.id IS NULL OR ( bd.name=s.name AND bd.city=s.city AND bd.st=s.st)
""".stripMargin)
+---+---+------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+------+----+---+---------------+-----------+--------------+--------------+------------+
|3|3|wangwu|ca| bb|2019-01-01|1|Y|2019-01-01|9999-12-31|
|2|2|lisi|us| cb|2019-01-01|1|Y|2019-01-01|9999-12-31|
+---+---+------+----+---+---------------+-----------+--------------+--------------+------------+
- 标记新过期的记录 (scd_valid_flag标记改为‘N’, scd_end_date更新为pre_date)
- 快照模式: 1. 基础维度表中仍然有效但未出现在阶段表中的记录 2. 发生SCD2的旧的记录
val pre_date = getPreDate()
val max_date = "9999-12-31"
val expireDF = sql(
s"""
|SELECT
|bd.sk sk,
|bd.id id,
|bd.name name,
|bd.city city,
|bd.st st,
|bd.scd_update_date scd_update_date,
|bd.scd_version scd_version,
|'N' scd_valid_flag,
|bd.scd_start_date scd_start_date,
|'$pre_date' scd_end_date
|FROM
|(
|SELECT *
|FROM base_dim
|WHERE scd_end_date='$max_date'
|) bd
|LEFT JOIN stage s
|ON bd.id=s.id
|WHERE s.id IS NULLOR bd.name <> s.name
""".stripMargin)
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
|5|5|mazi|aa| aa|2019-01-01|1|N|2019-01-01|2019-08-05|
|1|1|zhangsan|us| ca|2019-01-01|1|N|2019-01-01|2019-08-05|
|2|2|lisi|us| cb|2019-01-01|1|N|2019-01-01|2019-08-05|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
2) 增量变动模式: 1. 发生SCD2的旧的记录
val expireDF = sql(
s"""
|SELECT
|bd.sk sk,
|bd.id id,
|bd.name name,
|bd.city city,
|bd.st st,
|bd.scd_update_date scd_update_date,
|bd.scd_version scd_version,
|'N' scd_valid_flag,
|bd.scd_start_date scd_start_date,
|'$pre_date' scd_end_date
|FROM
|(
|SELECT *
|FROM base_dim
|WHERE scd_end_date='$max_date'
|) bd
|LEFT JOIN stage s
|ON bd.id=s.id
|WHERE bd.name <> s.name
""".stripMargin)
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
|5|5|mazi|aa| aa|2019-01-01|1|N|2019-01-01|2019-08-05|
|1|1|zhangsan|us| ca|2019-01-01|1|N|2019-01-01|2019-08-05|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
- 获取SCD2的新纪录
- 也可能包含SCD2新行中的SCD1变化
val scd2NewLineDF = sql(
s"""
| SELECT
|ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max sk,
|t1.id,
|t1.name,
|t1.city,
|t1.st,
|t1.scd_update_date,
|t1.scd_version,
|t1.scd_valid_flag,
|t1.scd_start_date,
|t1.scd_end_date
| FROM
|(
|SELECT
|t2.id id,
|t2.name name,
|t2.city city,
|t2.st st,
|t2.update_date scd_update_date,
|t1.scd_version + 1 scd_version,
|"Y" scd_valid_flag,
|t1.scd_start_date scd_start_date,
|'$max_date' scd_end_date
|FROM
|base_dim t1
|JOIN
|stage t2
|ON
|t1.id=t2.id AND t1.scd_valid_flag = 'Y' AND t1.name<>t2.name
|) t1
|CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM base_dim) t2
|
""".stripMargin)
+---+---+-----+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id| name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+-----+----+---+---------------+-----------+--------------+--------------+------------+
|6|1|zhang|u|c|2019-08-06|2|Y|2019-01-01|9999-12-31|
|7|5|ma|aa| aa|2019-08-06|2|Y|2019-01-01|9999-12-31|
+---+---+-----+----+---+---------------+-----------+--------------+--------------+------------+
- 获取只发生SCD1变化的记录
val scd1UpdateDF = sql(
s"""
| SELECT
|bd.sk,
|bd.id,
|bd.name,
|s.city,
|s.st,
|s.update_date,
|bd.scd_version,
|bd.scd_valid_flag,
|bd.scd_start_date,
|bd.scd_end_date
| FROM
|base_dim bd
|JOIN
|stage s
|ON
|bd.id=s.id AND bd.name=s.name ANDb.scd_valid_flag = 'Y'
|WHERE
|bd.city<>s.city OR bd.st<>s.st
""".stripMargin)
+---+---+-------+----+---+-----------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+-------+----+---+-----------+-----------+--------------+--------------+------------+
|4|4|zhaoliu|ac| cb| 2019-08-06|1|Y|2019-01-01|9999-12-31|
+---+---+-------+----+---+-----------+-----------+--------------+--------------+------------+
- 获取全新的记录,并汇聚成最终结果
val scdDF = unchangedDF.union(expireDF).union(scd2NewLineDF).union(scd1UpdateDF)
scdDF.createOrReplaceTempView("new_base_dim")
// grand-new records
val brandNewDF = sql(
s"""
| SELECT
|ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max sk,
|t1.*,
|1,
|"Y",
|'$pre_date',
|'$max_date'
| FROM
|(
|stage s
|LEFT ANTI JOIN
|base_dim bd
|ON
|s.id=bd.id
|) t1
|CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM new_base_dim) t2
""".stripMargin)
brandNewDF.show()
val finalDF = scdDF.union(brandNewDF)
finalDF.show()
+---+---+-------+----+---+-----------+---+---+-----------+----------+
| sk| id|name|city| st|update_date|1|Y|update_date|9999-12-31|
+---+---+-------+----+---+-----------+---+---+-----------+----------+
|8|6|laoyang|dd| dd| 2019-08-06|1|Y| 2019-08-06|9999-12-31|
+---+---+-------+----+---+-----------+---+---+-----------+----------+
8.最终结果
1) 快照模式:
+---+---+--------+----+---+-----------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+-----------+-----------+--------------+--------------+------------+
|3|3|wangwu|ca| bb| 2019-08-06|1|Y|2019-01-01|9999-12-31|
|5|5|mazi|aa| aa| 2019-01-01|1|N|2019-01-01|2019-08-05|
|1|1|zhangsan|us| ca| 2019-01-01|1|N|2019-01-01|2019-08-05|
|2|2|lisi|us| cb| 2019-01-01|1|N|2019-01-01|2019-08-05|
|6|1|zhang|u|c| 2019-08-06|2|Y|2019-01-01|9999-12-31|
|7|5|ma|aa| aa| 2019-08-06|2|Y|2019-01-01|9999-12-31|
|4|4| zhaoliu|ac| cb| 2019-08-06|1|Y|2019-01-01|9999-12-31|
|8|6| laoyang|dd| dd| 2019-08-06|1|Y|2019-08-06|9999-12-31|
+---+---+--------+----+---+-----------+-----------+--------------+--------------+------------+
2)增量变动模式:+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
| sk| id|name|city| st|scd_update_date|scd_version|scd_valid_flag|scd_start_date|scd_end_date|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
|3|3|wangwu|ca| bb|2019-01-01|1|Y|2019-01-01|9999-12-31|
|2|2|lisi|us| cb|2019-01-01|1|Y|2019-01-01|9999-12-31|
|5|5|mazi|aa| aa|2019-01-01|1|N|2019-01-01|2019-08-05|
|1|1|zhangsan|us| ca|2019-01-01|1|N|2019-01-01|2019-08-05|
|6|1|zhang|u|c|2019-08-06|2|Y|2019-01-01|9999-12-31|
|7|5|ma|aa| aa|2019-08-06|2|Y|2019-01-01|9999-12-31|
|4|4| zhaoliu|ac| cb|2019-08-06|1|Y|2019-01-01|9999-12-31|
|8|6| laoyang|dd| dd|2019-08-06|1|Y|2019-08-06|9999-12-31|
+---+---+--------+----+---+---------------+-----------+--------------+--------------+------------+
本文通过手动地实现SCD2+SCD1的中间变化操作,方便大家理解缓慢变化维度的概念和实现过程。另外,本文中的转换都是Spark在内存中的操作,在数据量大的情况下可能会影响性能,建议考虑用临时表做中间转换的可能性。
本文中对SCD1和SCD2的变动和比较都是基于字段关联分析的,在SCD字段比较多的时候会导致性能问题。一宗替代的方法是,在基础维度表和阶段表中增加scd1和scd2字段值得hash字段, 在后续的比较中使用这连个hash字段做连接条件。基于这个思路,后续将对快照式或增量变动式的阶段表进行数据插入操作的中间细节进行包装,实现自动化SCD引擎,敬请关注,自动化的SCD引擎已实现,地址 https://github.com/HanseyLee/AutoSparkSQLSCD 欢迎关注 。
【SCD|一种基于SparkSQL的Hive数据仓库拉链表缓慢变化维(SCD2+SCD1)的示例实现】<全文完>
推荐阅读
- 大数据|大数据开发技术hive篇
- spark|spark UDAF根据某列去重求合 distinct sum
- 大数据|spark UDAF 自定义聚合函数 UserDefinedAggregateFunction 带条件的去重操作
- Hive中order by,sort by,distribute by,cluster by的区别
- Spark 写入 MySQL 乱码问题
- Hive 函数使用(一)(datediff,row_number,partition)
- scala常见笔试题(囊括了scala的精髓)