写在前面
数据仓库的特性之一是集成,即首先把未经过加工处理的、不同来源的、不同形式的数据同步到ODS层,一般情况下,这些ODS层数据包括日志数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中),将数据采集并导入到数仓中(通常是Hive或者MaxCompute)是非常重要的一个环节。
【阿里的一键TT】那么,该如何将业务DB数据高效准确地同步到数仓中呢?一般企业会使用两种方案:直连同步与实时增量同步(数据库日志解析)。其中直连同步的基本思路是直连数据库进行SELECT,然后将查询的数据存储到本地文件作为中间存储,最后把文件Load到数仓中。这种方式非常的简单方便,但是随着业务的发展,会遇到一些瓶颈,具体见下文分析。
为了解决这些问题,一般会使用实时增量的方式进行数据同步,比如DataWorks提供的一键TT接入功能,其基本原理是CDC (Change Data Capture) + Merge,即实时Binlog采集 + 离线处理Binlog还原业务数据这样一套解决方案。
本文主要包括以下内容,希望对你有所帮助
- 数据同步的方式
- 一键TT接入的流程与步骤
- 基于Canal+Flink模拟实现TT数据接入
数据同步的方式
直连同步直连同步是指通过定义好的规范接口API和基于动态链接库的方式直接连接业务库,比如ODBC/JDBC等规定了统一的标准接口,不同的数据库基于这套标准提供规范的驱动,从而支持完全相同的函数调用和SQL实现。比如经常使用的Sqoop就是采取这种方式进行批量数据同步的。
- 数据同步时间:随着业务规模的增长,数据同步花费的时间会越来越长,无法满足下游数仓生产的时间要求。
- 性能瓶颈:直连数据库查询数据,对数据库影响非常大,容易造成慢查询,如果业务库没有采取主备策略,则会影响业务线上的正常服务,如果采取了主备策略,虽然可以避免对业务系统的性能影响,但当数据量较大时,性能依然会很差。
日志解析所谓日志解析,即解析数据库的变更日志,比如MySQL的Binlog日志,Oracle的归档日志文件。通过读取这些日志信息,收集变化的数据并将其解析到目标存储中即可完成数据的实时同步。这种读操作是在操作系统层面完成的,不需要通过数据库,因此不会给源数据库带来性能上的瓶颈。
一键TT接入的基本步骤 基本流程
基本流程如下,首先是将数据的变更日志推送到DataHub/TimeTunnel(TT,一种基于生产者、消费者和Topic消息标识的消息中间件,将消息数据持久化到HBase,其底层基于DataHub,类比Kakfa),然后将表全量同步,且仅全量同步一次,接着将TT的数据订阅消费到ODPS的TTSource表,并将TTSource表合并到增量表中,最后将增量表与全量表进行Merge得到最新的全量表。以后的同步流程仅执行增量表的数据装载以及增量表与全量表的Merge合并,合并的方式是使用全外连接(full outer join)+数据覆盖(insert overwrite)的方式,比如日调度,即是将当天的增量数据和前一天的全量数据做全外连接,重新装载获取最新的一份全量数据。
文章图片
步骤解释
以WDK_MEMBER_PRO_APP.member_round_detail为例
Step1. BPMS审批 Step2.创建TT表 这一步会在TT(类比Kafka)中生成名为wdk_member_pro_app_member_round_detail的Topic,用于存储Binlog变更日志(比如INSERT、UPDATE数据),该Topic的一般命名格式是:${TDDL APP NAME}_${源表名}。值得注意的是,由于一行数据记录的变更日志是有严格顺序的,而DataHub/TimeTunnel是分区有序的,所以要保证同一个key的数据进入到Topic的同一个分区,这样才能保证有序,这也是为什么要指定唯一主键的原因。
然后创建TT Source表,该表名为:s_tt_wdk_member_pro_app_member_round_detail_tt4,表名的一般格式是:s_tt_${TDDL APP NAME}_${源表名}_tt4,该表主要用于订阅DataHub/TimeTunnel中的Topic,将DataHub的数据抽取到ODPS中存储(即将已经采集到TT的数据存储到ODPS上)。该表的content字段存储了变更日志数据,包括一部分的元数据信息和实际的日志数据,关于元数据信息,具体如下:
- dbsync_ts: 同步机器的时间戳(ms)+自增序列
- dbsync_db_name: 物理分库
- dbsync_table_name: 物理分表
- dbsync_modify_time: Binlog变更日志时间,即数据持久化到DB的时间
- dbsync_operation: 数据的操作类型,比如insert、update
- dbsync_region_id: 标识本条数据的地域信息,适用于集团单元化场景。
Step3.创建增量表在ODPS中创建一张名为s_tt_member_round_detail_delta的表,该表的一般命名格式是:${目标表名}_delta。该表用于存储增量的数据,比如按天增量抽取,那么表中的数据就是当天的增量数据。主要处理逻辑是读取TT Source表的当天分区数据,将其写入到该增量表中。详见TTMerge节点任务。
INSERT OVERWRITE TABLE s_tt_member_round_detail_delta PARTITION(ds='20210304')
SELECTid
,gmt_create
,gmt_modified
,user_id
,start_date
,end_date
,STATUS
,TYPE
,attributes
,round
,merchant_code
,consume_discount
,card_id
,plan_id
,biz_id
FROM(
-- 由于TT写入的数据是有时序的,例如一条记录在一天被更新N次,则在当日的表中,就有N条记录。
-- 对于按天增量的数据来说,只需要最新的数据即可
-- 按照主键分区排序,分组取最新的数据
SELECTrow_number() OVER(PARTITION BY id,user_id,biz_id ORDER BY id,user_id,biz_id,dbsync_ts DESC) AS row_number
,dbsync_operation
,id
,gmt_create
,gmt_modified
,user_id
,start_date
,end_date
,STATUS
,TYPE
,attributes
,round
,merchant_code
,consume_discount
,card_id
,plan_id
,biz_id
FROM(
SELECTdbsync_ts
,dbsync_operation
,id
,gmt_create
,gmt_modified
,user_id
,start_date
,end_date
,status
,TYPE
,attributes
,round
,merchant_code
,consume_discount
,card_id
,plan_id
,biz_id
FROM(-- 获取并解析当天的变更日志数据
SELECTtt_split(content, 21) AS (dbsync_ts,dbsync_db_name,dbsync_table_name,dbsync_modify_time,dbsync_operation,dbsync_change_fields,id,gmt_create,gmt_modified,user_id,start_date,end_date,status,TYPE,attributes,round,merchant_code,consume_discount,card_id,plan_id,biz_id)
FROMhm_ods.s_tt_wdk_member_pro_app_member_round_detail_tt4
WHERE(
ds > '20210304'
OR(ds = '20210304' AND (hh > '00' OR (hh = '00' AND mm >= '00')))
)
AND(
ds < '20210305'
OR(ds = '20210305' AND (hh < '00' OR (hh = '00' AND mm < '00')))
)
) t
WHEREt.dbsync_ts != ''
) b
) u
WHERErow_number = 1
;
Step5.TTMerge节点发布 发布TTMerge的任务
Step6.TTMerge节点冒烟 执行TTMerge任务的冒烟测试,在运维中心的测试实例中可以查看。
Step7.创建全量表 创建全量表s_tt_member_round_detail,该表名就是在一键TT接入时填写的目标表名。该表用于存储截止到当天的全量数据,处理逻辑是使用当天的增量数据与昨天的全量数据进行全外连接,从而merge一份最新的全量数据。
Step8.创建全量表同步节点 创建全量表的同步节点任务,该任务只执行一次,用于首次将源表全量同步至目标表,后续的操作使用增量表与全量表进行Merge的方式来获取最新的全量数据。
Step9.全量表节点发布 发布全量表同步任务。
Step10.全量表节点冒烟 全量表同步任务冒烟测试,可以在运维中心的测试实例中查看,具体的任务名为:imp_s_tt_member_round_detail,其一般格式为:imp_${目标表名}。
Step11.Merge节点生成 生成增量表与全量表的Merge任务,当天的增量数据与昨天的全量数据进行全外连接,该Merge任务的基本逻辑是:
INSERT OVERWRITE TABLE s_tt_member_round_detail PARTITION(ds='20210305')
SELECTCASEWHEN n.id IS NULL THEN o.id
ELSE n.id
END
,CASEWHEN n.id IS NULL THEN o.gmt_create
ELSE n.gmt_create
END
,CASEWHEN n.id IS NULL THEN o.gmt_modified
ELSE n.gmt_modified
END
,CASEWHEN n.id IS NULL THEN o.user_id
ELSE n.user_id
END
,CASEWHEN n.id IS NULL THEN o.start_date
ELSE n.start_date
END
,CASEWHEN n.id IS NULL THEN o.end_date
ELSE n.end_date
END
,CASEWHEN n.id IS NULL THEN o.STATUS
ELSE n.STATUS
END
,CASEWHEN n.id IS NULL THEN o.TYPE
ELSE n.TYPE
END
,CASEWHEN n.id IS NULL THEN o.attributes
ELSE n.attributes
END
,CASEWHEN n.id IS NULL THEN o.round
ELSE n.round
END
,CASEWHEN n.id IS NULL THEN o.merchant_code
ELSE n.merchant_code
END
,CASEWHEN n.id IS NULL THEN o.consume_discount
ELSE n.consume_discount
END
,CASEWHEN n.id IS NULL THEN o.card_id
ELSE n.card_id
END
,CASEWHEN n.id IS NULL THEN o.plan_id
ELSE n.plan_id
END
,CASEWHEN n.id IS NULL THEN o.biz_id
ELSE n.biz_id
END
FROM(
SELECT*
FROMs_tt_member_round_detail_delta
WHEREds = '20210305'
ANDid IS NOT NULL
ANDuser_id IS NOT NULL
ANDbiz_id IS NOT NULL
) n
FULL OUTER JOIN (-- 全外连接进行数据merge
SELECT*
FROMs_tt_member_round_detail
WHEREds = '20210304'
ANDid IS NOT NULL
ANDuser_id IS NOT NULL
ANDbiz_id IS NOT NULL
) o
ONo.id = n.id
ANDo.user_id = n.user_id
ANDo.biz_id = n.biz_id
;
Step12.Merge节点发布 发布Merge任务
从内网做点笔记 哈哈
推荐阅读
- 数据仓库教程目录
- 什么是OLAP(在线分析处理)()
- 什么是元数据()
- 三层数据仓库架构
- ROLAP,MOLAP和HOLAP之间的区别
- 运营数据库和数据仓库之间的区别
- OLTP和OLAP之间的区别
- ETL(提取,转换和加载)过程
- 什么是雪花模式()