【开发方案】宽表ETL实现方案

1. 目标 【【开发方案】宽表ETL实现方案】通过底层系统数据的拉通,数据治理统一数据口径,实现控制塔KPI体系的线上数字化存储计算和灵活展现,支撑项目的顺利落地。
2. 方案设计 2.1 主流ETL工具调研

维度\产品 DataPipeline kettle Oracle Gaodengate Informatica talend Datax
功能 适用场景 主要用于各类数据融合、数据交换场景,专为超大数据量、高度复杂的数据链路设计的灵活、可扩展的数据交换平台 面向数据仓库建模传统ETL工具 主要用于数据备份、容灾 面向数据仓库建模传统ETL工具 面向数据仓库建模传统ETL工具 面向数据仓库建模传统ETL工具
使用方式 全流程图形化界面,应用端采用B/S架构,Cloud Native为云而生,所有操作在浏览器内就可以完成,不需要额外的开发和生产发布 C/S客户端模式,开发和生产环境需要独立部署,任务的编写、调试、修改都在本地,需要发布到生产环境,线上生产环境没有界面,需要通过日志来调试、debug,效率低,费时费力 没有图形化的界面,操作皆为命令行方式,可配置能力差 C/S客户端模式,开发和生产环境需要独立部署,任务的编写、调试、修改都在本地,需要发布到生产环境;学习成本较高,一般需要受过专业培训的工程师才能使用; C/S客户端模式,开发和生产环境需要独立部署,任务的编写、调试、修改都在本地,需要发布到生产环境; DataX是以脚本的方式执行任务的,需要完全吃透源码才可以调用,学习成本高,没有图形开发化界面和监控界面,运维成本相对高。
底层架构 分布式集群高可用架构,可以水平扩展到多节点支持超大数据量,架构容错性高,可以自动调节任务在节点之间分配,适用于大数据场景 主从结构非高可用,扩展性差,架构容错性低,不适用大数据场景 可做集群部署,规避单点故障,依赖于外部环境,如Oracle RAC等; schema mapping非自动;可复制性比较差;更新换代不是很强 支持分布式部署 支持单机部署和集群部署两种方式
CDC机制 基于日志、基于时间戳和自增序列等多种方式可选 基于时间戳、触发器等 主要是基于日志 基于日志、基于时间戳和自增序列等多种方式可选 基于触发器、基于时间戳和自增序列等多种方式可选 离线批处理
对数据库的影响 基于日志的采集方式对数据库无侵入性 对数据库表结构有要求,存在一定侵入性 源端数据库需要预留额外的缓存空间 基于日志的采集方式对数据库无侵入性 有侵入性 通过sql select 采集数据,对数据源没有侵入性
自动断点续传 支持 不支持 支持 不支持,依赖ETL设计的合理性(例如T-1),指定续读某个时间点的数据,非自动 不支持,依赖ETL设计的合理性(例如T-1),指定续读某个时间点的数据,非自动 不支持
监控预警 可视化的过程监控,提供多样化的图表,辅助运维,故障问题可实时预警 依赖日志定位故障问题,往往只能是后处理的方式,缺少过程预警 无图形化的界面预警 monitor可以看到报错信息,信息相对笼统,定位问题仍需依赖分析日志 有问题预警,定位问题仍需依赖日志 依赖工具日志定位故障问题,没有图形化运维界面和预警机制,需要自定义开发。
数据清洗 围绕数据质量做轻量清洗 围绕数据仓库的数据需求进行建模计算,清洗功能相对复杂,需要手动编程 轻量清洗 支持复杂逻辑的清洗和转化 支持复杂逻辑的清洗和转化 需要根据自身清晰规则编写清洗脚本,进行调用(DataX3.0 提供的功能)。
数据转换 自动化的schema mapping 手动配置schema mapping 需手动配置异构数据间的映射 手动配置schema mapping 手动配置schema mapping 通过编写json脚本进行schema mapping映射
特征 数据实时性 实时 非实时 实时 支持实时,但是主流应用都是基于时间戳等方式做批量处理,实时同步效率未知 实时 定时
应用难度
是否需要开发
易用性
稳定性
其他 实施及售后服务 原厂实施和售后服务 开源软件,需自客户自行实施、维护 原厂和第三方的实施和售后服务 主要为第三方的实施和售后服务 分为开源版和企业版,企业版可提供相应服务 阿里开源代码,需要客户自动实施、开发、维护
2.2 kettle使用体验
根据上述调研,选择了最流行且免费的kettle作为体验对象。
  1. 优点:
  2. 无需开发,通过界面操作,即可实现数据ETL流程(对非开发人员友好)。
  3. 缺点:
  4. 对开发人员来说,新增同步时,界面操作的效率,不一定比自己实现的效率高。
  5. 复杂的计算逻辑仍然需要写代码,并且是在工具的界面上,按工具要求的规范来写。
根据上述体验结果,结合当前业务“有大量的计算字段”的特点,最终选择了自主开发方案。
2.3 自主开发方案
  • 数据同步方案
    【开发方案】宽表ETL实现方案
    文章图片
  • ETL简易流程
    【开发方案】宽表ETL实现方案
    文章图片
3. 方案实现 3.1 设计原则
宽表同步&计算有以下特点:
  • 需要从多个业务表中同步数据,每个表都要执行“查询数据“和”映射字段值到宽表中",同时后续可能会新增映射字段(包括新加原始业务表)。
  • 除了已有的业务字段外,宽表还有根据当前字段计算出来的“计算结果”字段,并且后续也可能会持续新增此类字段。
  • 流程环节可能会新增,也可能会删除。
以上都是可预见的会扩展、修改比较频繁的点,根据开闭原则(对扩展开放,对修改关闭),将业务表的数据查询和映射剥离出来,提供抽象对象来支持扩展;“计算结果”字段也是一样;将每个流程环节独立成一个类,多个类按顺序串成一个链,新增/删除的时候就在链中插入/删除节点。
3.2 详细流程
【开发方案】宽表ETL实现方案
文章图片

3.3 JAVA类图
【开发方案】宽表ETL实现方案
文章图片

3.4 重点类介绍
3.4.1 通用对象:
  • BaseMerger:封装了流程通用的“启动流程前”、“执行流程”、“流程正常执行完成(正常执行完成时进入)”、“流程异常处理(异常时进入)”、“流程执行完成(不管是否异常都会进入本方法)”方法,同时提供扩展方法“流程名称”,由子类实现,用于获取子类需要的流程节点列表(从所有的FlowNode列表中筛选出属于该流程的列表)。
  • MergeContext:上下文对象,用于在各个节点中传递数据。
  • BaseFlowNode:流程节点父类,封装了每个节点执行时通用的方法,如日志记录,子类只需要实现核心的“doExecute”方法即可。
  • BaseLoopFlowNode:可循环的流程节点父类,当流程判断(当前节点为BaseLoopFlowNode子类,且上下文中“循环次数”> 1时),会开始进入循环状态,直到(下一个节点不是BaseLoopFlowNode,且循环次数达到最大次数)时,才会跳出循环。
    protected void executeFlow(MergeContext context) { BaseFlowNode currentNode = flowNodes.get(0); BaseFlowNode beginLoopNode = null; do { if (!context.isRunning()) { break; } currentNode.execute(context); int totalLoopTimes = context.getTotalLoopTimes(); // 循环节点,且需要循环 if (currentNode instanceof BaseLoopFlowNode && totalLoopTimes > 0) { // 启动循环 if (beginLoopNode == null) { beginLoopNode = currentNode; } BaseFlowNode nextFlowNode = currentNode.getNext(); // 最后一个节点 if (nextFlowNode == null) { // 但是还未到达最大循环次数 if (context.getCurrentLoopTimes() < totalLoopTimes - 1) { // 重头开始循环 currentNode = beginLoopNode; // 循环次数加1 context.setCurrentLoopTimes(context.getCurrentLoopTimes() + 1); } else { // 结束循环 ((BaseLoopFlowNode) currentNode).resetLoop(context); beginLoopNode = null; currentNode = nextFlowNode; } } else { if (nextFlowNode instanceof BaseLoopFlowNode) { // 下一个节点也是循环中的一部分 currentNode = nextFlowNode; } else if (context.getCurrentLoopTimes() < totalLoopTimes - 1) { // 还未到达最大循环次数 // 重头开始循环 currentNode = beginLoopNode; // 循环次数加1 context.setCurrentLoopTimes(context.getCurrentLoopTimes() + 1); } else { // 结束循环 ((BaseLoopFlowNode) currentNode).resetLoop(context); beginLoopNode = null; currentNode = nextFlowNode; } } } else { // 循环结束、或者还没开始、或者不需要循环 currentNode = currentNode.getNext(); } } while (currentNode != null); }

3.4.2 详细流程对象
  • LqPrDetailMergeConvertor:根据“开闭原则”拆出来的对象,当读取到的业务表数据需要映射到宽表对象时,在此类做修改。
  • BaseDataExtractor:根据“开闭原则”拆出来的抽象对象,需要从业务表中“读取数据”、“过滤数据”时,添加此类的实现类即可。
  • BaseCalculator:根据“开闭原则”拆出来的抽象对象,需要对宽表的某个字段做计算或者调整时,添加此类的实现类即可。
参考
六种 主流ETL 工具的比较

    推荐阅读