06 DataBricks+DataFactory+Blob项目实战

背景:客户需要为业务做一些数据展示. 客户会通过s3 每天给到我们增量数据.我们每天通过DataFactory的job抽取s3的数据,抽取后的原始数据存储到Blob容器中,再通过job抽取数据到DataBricks表中,同时通过 spark sql 处理数据,形成结果表,最后提供给BI同事,制作前端报表.其中基础或关键的步骤已经在前面介绍。
抽取原始数据
第一步抽取原始数据,项目的基础是如何稳定地从s3抽取数据,并插入到DataBrikcs表中.大致流程如下。若不使用Blob容器,可能会简单点。
06 DataBricks+DataFactory+Blob项目实战
文章图片

遍历s3容器,获取数据到Blob
  1. 首先我们要明确不是所有的s3文件都会取进来,要根据当前实际需要取数据,所有我们需要一个配置表以来限制获取的s3文件. 如 FILE_LIST.csv 文件放入到BLOB中,文件有两列,一列为表名也对于数据文件名,另一列为状态值判断是否有效。将FILE_LIST先使用【简单实例】的方法,放入DataBricks表中作为配置表。如
    FILE_NAME IS_ACTIVE
    DW_TEST 1
  2. 根据【04 DataBricks遍历S3容器】我们可以在s3中取到存在FILE_LIST表中的数据文件路径,并将路径存储在一表中,如flag_file_info表
  3. 接下来,通过DataFactory中lookup活动,获取到flag_file_info的记录。配置如下。
    -- 拼接数据文件的路径 ,注意放入lookup活动中时不要有回车 select 'EDW_SHARE/Request/DataFile'||replace(substr(flag_file,instr(flag_file,'FLAGFILE_')+8),'.csv','')||'/'||content||'.csv' as file_name,flag_file,content||'.csv' as content from cfg.flag_file_info where status=1 and date_id = date_format(from_utc_timestamp(current_timestamp(),'UTC+8'),'yyyyMMdd')

    06 DataBricks+DataFactory+Blob项目实战
    文章图片

  4. 迭代。ForEach逐条获取查询的结果,并设置变量
06 DataBricks+DataFactory+Blob项目实战
文章图片

  1. ForEach中放入两个活动,一获取s3数据到Blob,二更新记录的状态 。同样是复制数据,但这里复制数据的源是s3,目标是Blob
06 DataBricks+DataFactory+Blob项目实战
文章图片

源S3数据集配置如下。接收器Blob设置类似
06 DataBricks+DataFactory+Blob项目实战
文章图片

批量转换CSV文件为Parquet文件 将所有同步到的s3文件放在Blob一个文件中,同样利用迭代把每个CSV文件转换成Parquet文件。管道活动
【06 DataBricks+DataFactory+Blob项目实战】06 DataBricks+DataFactory+Blob项目实战
文章图片

获取元数据的数据集配置如下:
06 DataBricks+DataFactory+Blob项目实战
文章图片

注意迭代设置的变量
@activity('get_file_name').output.childItems

迭代中的活动放入【复制数据】源和接收器的配置如下:
06 DataBricks+DataFactory+Blob项目实战
文章图片

将同步数据到DataBricks表中 将Parquet文件生产临时视图插入数据到DataBricks表中。因为这一步易出错,且每个表出错的情况不同,所以这一步没有使用变量批量处理,每个表单独处理。参考【05 简单实例】
处理数据到dw
每天的增量文件时同步到stg库中,但是为了防止出现错误,stg只保留每天增量文件。stg再根据每个表的不同更新策略同步数据到DW层,DW层保留全量数据。JOB时直接调用sp所在的笔记本就可以了。
处理数据到dm
底层有表了,剩下就可以处理数据,处理完的数据放在另一层。对外提供服务。JOB调用和dw一样。
整体JOB如下:
06 DataBricks+DataFactory+Blob项目实战
文章图片

触发器、监控和警报
触发器 每个管道都可以设置触发器,可定时运行JOB。相对简单就不再介绍了
监控和警报 在监视器->监控和警报中创建新的预警规则。主要是配置条件选择指标和添加操作组、通知
配置条件 选择指标时可以选择管道级别,也可以选择活动级别。其他的看自己的需求设置就可以了。
06 DataBricks+DataFactory+Blob项目实战
文章图片

添加操作组和通知人 一个操作组里可以放多个通知,创建操作组后,其他的监控预警也可以引用。
通知可以邮件、短信、电话语音。填入对于信息即可(产生对应费用)

    推荐阅读