spark|6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl

6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark) 6.1 使用spark dsl 方式处理 6.1.1 注意事项

# 开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了 .enableHiveSupport()# 这下脚本都是作用在dwd层,所以必须在dwd的用户下执行,可能会报权限不够,需要我们申请权限

6.1.2 项目结构如下:
spark|6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl
文章图片

1.是脚本,内容如下:
# 分区 ds=$1# 执行任务 spark-submit \ --master yarn-client \ --class com.wt.dwd.DwdFcjNwrsSellbargainMskDay \ ../target/dwd-1.0-SNAPSHOT.jar \ $ds # 增加分区 hive -e "alter table dwd.dwd_gsj_reg_investor_msk_d add IF NOT EXISTSpartition (ds='$ds')"# 注意: 1.如果换行的话,后面必须加上 \ 2.因为jar包如果不指定路径的话会找不到 3.可以在最后面动态的增加分区,然后再动态的传入变量

2.是在dwd中运行的hive建表语句
-- hive建表语句 -- hive建表语句 CREATE external TABLE IF NOT EXISTSdwd.dwd_fcj_nwrs_sellbargain_msl_d( id STRING comment '身份证号码', r_fwzl STRING comment '房产地址', htydjzmj STRING comment '合同中约定房子面积', tntjzmj STRING comment '房子内建筑面积', ftmj STRING comment '房子分摊建筑面积', time_tjba STRING comment '商品房备案时间', htzj STRING comment '合同总价' )PARTITIONED BY ( dsSTRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS textfile location '/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/';

3,是代码运行的逻辑,主要是处理数据
package com.wt.dwd import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object DwdFcjNwrsSellbargainMskDay { def main(args: Array[String]): Unit = {/** * 1. 创建spark环境 * */ val spark: SparkSession = SparkSession .builder //.master("local") .enableHiveSupport() //开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了 .getOrCreate()import spark.implicits._ import org.apache.spark.sql.functions._/** * 获取时间分区的字段 * */ val ds: String = args.head /** * 2. 读取获取购房合同中的表 * 必须带上库名,否则读不到 * * 不可能读取所有的数据,我们只需要读取每一天的数据 * */ val sellbargain: DataFrame = spark .table("ods.ods_t_fcj_nwrs_sellbargain") .where($"ds" === ds)//对原始的数据进行托名 对id进行脱敏,然后将r_fwzl中的数字变成 * 号(通过正则表达式替换) val resultDF: DataFrame = sellbargain.select( upper(md5($"id")) as "id", regexp_replace($"r_fwzl", "\\d", "*") as "r_fwzl", $"htydjzmj", $"tntjzmj", $"ftmj", $"time_tjba", $"htzj" )resultDF .write .format("csv") .mode(SaveMode.Overwrite) .option("sep","\t") .save(s"/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/ds=$ds")//提交到集群运行 spark-submit --master yarn-client --class com.wt.dwd.DwdFcjNwrsSellbargainMskDay dwd-1.0-SNAPSHOT.jar } }

6.1.3 将通用的东西封装(重要-可以极快的提高效率):
spark|6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl
文章图片

代码逻辑如下:
package com.wt.common import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} abstract class SparkTool extends Logging{ def main(args: Array[String]): Unit = {/** * 获取时间分区 * */ if(args.length == 0){ logError("请指定分区!!!") return } val ds: String = args.head//创建spark环境 val spark: SparkSession = SparkSession .builder() .enableHiveSupport() .getOrCreate()//调用子类实现的抽象方法 this.run(spark,ds) }/** * 抽象方法: 在子类中实现这个方法 * import spark.implicits._ * import org.apache.spark.sql.functions._ * * @param spark: spark的环境 * @param ds:分区 */ def run(spark: SparkSession,ds: String): Unit/** * 传入DataFrame 和 path 就可以保存数据 * * 其中的format默认值是 csv格式的。 * */def save(dataframe:DataFrame,path:String,format:String = "csv"): Unit={ dataframe .write .format("csv") .mode(SaveMode.Overwrite) .option("sep","\t") .save(path) } }

理解:子类继承父类,在父类中已经封装好了spark,DataFormat的save 两个环境,需要子类继承SparkTool ,就可以拿到父类已经创建好的环境,减少代码量,提高效率
在save 中可以设置默认值,如果不传入的话,就使用默认值
而且该工具还吧 ds 给封装好了,我们在使用的时候可以直接用变量传入即可
6.1.4 调用方法如下:
package com.wt.dwd import com.wt.common.SparkTool import org.apache.spark.sql.{DataFrame, SparkSession}object DwdGsjRegLegrepreMskDay extends SparkTool{ /** * 抽象方法: 在子类中实现这个方法 * import spark.implicits._ * import org.apache.spark.sql.functions._ * * @param spark: spark的环境 * @param ds:分区 */ override def run(spark: SparkSession, ds: String): Unit = { import spark.implicits._ import org.apache.spark.sql.functions._/** * 读取hive中的表 * */ val legrepre: DataFrame = spark .table("ods.ods_t_gsj_reg_legrepre") .where($"ds" === ds)val resultDF: DataFrame = legrepre .select( upper(md5($"id")) as "id", $"position", upper(md5($"tel")) as "tel", $"appounit", $"accdside", $"posbrmode", $"offhfrom", $"offhto" )save(resultDF,s"/daas/motl/dwd/dwd_gsj_reg_legrepre_msk_d/ds=$ds")} }

6.1.5 脚本如下:
因为在common工具和dwd不在同一个模块中,需要在dwd模块中导入common的jar包
spark|6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl
文章图片

需要导入依赖,如下:
spark|6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl
文章图片

脚本如下;
# 分区 ds=$1# --jars :指定代码需要的其他的包# 执行任务 spark-submit \ --master yarn-client \ --class com.wt.dwd.DwdGsjRegLegrepreMskDay \ --jars ../lib/common-1.0-SNAPSHOT.jar \ ../target/dwd-1.0-SNAPSHOT.jar \ $ds # 增加分区 hive -e "alter table dwd.dwd_gsj_reg_legrepre_msk_dadd IF NOT EXISTSpartition (ds='$ds')"

将dwd包拖到dwd用户下,执行脚本,最终结果如下
【spark|6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl】spark|6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl
文章图片

    推荐阅读