hive|Spark读取csv文件,清洗后存入Hive库中

前言:我的依赖文件和hive-site.xml文件在这篇文章末尾,仅供参考,这里就不贴了。SparkSQL抽取Mysql全量数据到Hive动态分区表中
【hive|Spark读取csv文件,清洗后存入Hive库中】配置好相关依赖,然后将集群中的hive-site.xml文件复制一份放在项目中的resources目录下。

import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSessionimport scala.util.matching.Regexobject A_my_rush { def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession .builder() .appName("A_my_rush") .master("local[*]") .enableHiveSupport()//注意添加hive支持 .getOrCreate()import spark.implicits._//导入隐式转换,注意这个spark是我上头定义的spark,不是系统的。//读取本地csv文件 val rdd: RDD[String] = spark.sparkContext.textFile( "C:\\Users\\Administrator\\Desktop\\Spark练习题\\my_exam_A\\shoping.csv" )//进行清洗操作,你们不用知道干了什么,就是清洗过滤 rdd .filter(action => { val datas: Array[String] = action.split(",") var data_isGood: Boolean = datas.length == 8 for (i <- datas) { if (i == "" || i == null) { data_isGood = false } } data_isGood }) .map((_, 1)) .groupByKey() .keys .map(action => {val datas: Array[String] = action.split(",")var result = "" var num = 0 var fuhao = ","val event_time_pattern: Regex = "[0-9]{4}-[0-9]{2}-[0-9]{2}".rval detail_time_pattern: Regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}".rfor (i <- datas) {var tmp: String = iif (num == 0) {val event_time: String = event_time_pattern.findAllIn(tmp).mkString(",").split(",")(0)val detail_time: String = detail_time_pattern.findAllIn(tmp).mkString(",").split(",")(0)tmp = event_time + "," + detail_time}if (num == 4) { val arr: Array[String] = tmp.split("[.]") var str = "" var count = 0 var s = "|" for (i <- arr) { if (arr.length - 1 == count) { s = "" } str += i .replaceFirst( i.charAt(0).toString, i.charAt(0).toUpper.toString ) + s count += 1 } tmp = str }if (num == 7) { fuhao = "" }result += tmp + fuhaonum += 1} result }) .toDF()//关键部分:转换为dateframe,方便进行后续的sparkSql操作 .createOrReplaceTempView("rush_data") //创建临时表 //利用sparkSql在hive中创建一个表 spark.sql(""" |create table if not exists mydb.shop( |event_time string, |detail_time string, |order_id string, |product_id string, |category_id string, |category_code string, |brand string, |price string, |user_id string) |row format delimited fields terminated by '\t' |""".stripMargin)/* 关键部分: 上面的数据清洗完以后,我创建了临时表rush_data,注意,当你把这个数据转换成dateframe时, 每一行数据只会变成一个字段,并且字段名叫: value ,所以我们要将其分隔开。 很好办,根据你的分隔符来使用 split 函数就能分出来,按你想要的顺序依次排 好就行了,前提是你每一行的数据长度都一样,不要会报错,下标越界嘛,懂得懂得,我上面做了清 洗,所以可以保证长度一样 */ spark.sql(""" |select |split(value,",")[0] event_time, |split(value,",")[1] detail_time, |split(value,",")[2] order_id, |split(value,",")[3] product_id, |split(value,",")[4] category_id, |split(value,",")[5] category_code, |split(value,",")[6] brand, |split(value,",")[7] price, |split(value,",")[8] user_id |from rush_data |""".stripMargin).createOrReplaceTempView("data")//将数据插入上面创建的表中,检查是否成功插入,完成 spark.sql(""" |insert into table mydb.shop |select * from data |""".stripMargin)spark.stop() } }



    推荐阅读