04 DataBricks+DataFactory+Blob简单实例
背景:为接下来的实战项目,本文先介绍一个简单实例。目标:将本地csv数据文件同步到Databricks表中创建表 在DataBricks中先创建一个表
create table stg.stg_text (
indes stirng,
edw_created_on_dt timestamp,
edw_changed_on_dt timestamp,
edw_etl_insert_dt timestamp,
edw_etl_update_dt timestamp,
etl_insert_dt timestamp,
etl_update_dt timestamp
)using delta
location '/mnt/data_warehouse/az_kpi/stg.db/stg_text';
-- 结构存储位置
上传文件到Blob 打开Azure首页进入存储账户中,点入进入Blob容器,点击上传csv文件
【04 DataBricks+DataFactory+Blob简单实例】

文章图片
转换CSV文件为Parquet文件 Parquet是列式存储格式的文件,parquet文件压缩比高更节省空间,且读写更高效。
打开DataFactory 【创作】新建一个管道,拖【复制数据】组件到面板.点击源。

文章图片
新建一个数据源Blob数据源。选择数据类型CSV

文章图片
选择链接服务,即创建的Blob容器,填入路径,确定创建完成。测试链接成功。

文章图片
然后在源中选择刚才创建的源。

文章图片
接收器与源的创建过程类似,路径要选择存储Paruet文件的路径。创建完成如下图

文章图片
其他设置暂时不需要设置,点击【调试】测试一下是否转换成功。
同步数据到DataBricks表中 在这一步需要在DataBricks使用脚本实现。然后再用DataFactory的Job调用DataBricks脚本。这里使用的是Python
全局配置文件,主要是Blob的链接信息。创建一个notebook Default Language:Python 选择集群。

文章图片
if getArgument("schema") == 'stg': # 当前数据库
storage_account_name = "存储服务账号名"
storage_account_access_key = "存储服务的访问key"
pre_str="wasbs://Blob仓库名@databricks账号名.blob.core.chinacloudapi.cn/"
path=getArgument("file_name")
file_location = pre_str + path
file_type = "parquet"
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.chinacloudapi.cn",
storage_account_access_key)
# 其它数据库
elif getArgument("schema") in ['dw','dm']:
sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
创建数据库目录stg,在目录下创建notebook Default Language:python 。内容如图。 分为三部分,第一部分引用配置文件,接受文件名参数,确定文件;第二部分将Parquet文件转换成临时视图;第三部分将视图数据插入到表中。

文章图片
回到DataFactory中在【活动】拖拽一个笔记本到面板。选择笔记本的DataBricks的来链接服务,在设置中选择笔记本路径,并填入库名参数和文件名参数。

文章图片

文章图片
调试运行,检查数据是否已经同步到表中。
推荐阅读
- 科学养胃,别被忽悠,其实真的很简单
- opencv|opencv C++模板匹配的简单实现
- 松软可口易消化,无需烤箱超简单,新手麻麻也能轻松成功~
- 简单心理2019春A期+32+张荣
- 《算法》-图[有向图]
- android防止连续点击的简单实现(kotlin)
- 机器学习一些简单笔记
- Android超简单实现沉浸式状态栏
- v-charts简单使用
- 校园非自愿来访者辅导问句学习