前言:我的依赖文件和hive-site.xml文件在这篇文章末尾,仅供参考,这里就不贴了。SparkSQL抽取Mysql全量数据到Hive动态分区表中
【hive|Spark读取csv文件,清洗后存入Hive库中】配置好相关依赖,然后将集群中的hive-site.xml文件复制一份放在项目中的resources目录下。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSessionimport scala.util.matching.Regexobject A_my_rush {
def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession
.builder()
.appName("A_my_rush")
.master("local[*]")
.enableHiveSupport()//注意添加hive支持
.getOrCreate()import spark.implicits._//导入隐式转换,注意这个spark是我上头定义的spark,不是系统的。//读取本地csv文件
val rdd: RDD[String] = spark.sparkContext.textFile(
"C:\\Users\\Administrator\\Desktop\\Spark练习题\\my_exam_A\\shoping.csv"
)//进行清洗操作,你们不用知道干了什么,就是清洗过滤
rdd
.filter(action => {
val datas: Array[String] = action.split(",")
var data_isGood: Boolean = datas.length == 8
for (i <- datas) {
if (i == "" || i == null) {
data_isGood = false
}
}
data_isGood
})
.map((_, 1))
.groupByKey()
.keys
.map(action => {val datas: Array[String] = action.split(",")var result = ""
var num = 0
var fuhao = ","val event_time_pattern: Regex = "[0-9]{4}-[0-9]{2}-[0-9]{2}".rval detail_time_pattern: Regex = "[0-9]{2}:[0-9]{2}:[0-9]{2}".rfor (i <- datas) {var tmp: String = iif (num == 0) {val event_time: String =
event_time_pattern.findAllIn(tmp).mkString(",").split(",")(0)val detail_time: String =
detail_time_pattern.findAllIn(tmp).mkString(",").split(",")(0)tmp = event_time + "," + detail_time}if (num == 4) {
val arr: Array[String] = tmp.split("[.]")
var str = ""
var count = 0
var s = "|"
for (i <- arr) {
if (arr.length - 1 == count) {
s = ""
}
str += i
.replaceFirst(
i.charAt(0).toString,
i.charAt(0).toUpper.toString
) + s
count += 1
}
tmp = str
}if (num == 7) {
fuhao = ""
}result += tmp + fuhaonum += 1}
result
})
.toDF()//关键部分:转换为dateframe,方便进行后续的sparkSql操作
.createOrReplaceTempView("rush_data") //创建临时表
//利用sparkSql在hive中创建一个表
spark.sql("""
|create table if not exists mydb.shop(
|event_time string,
|detail_time string,
|order_id string,
|product_id string,
|category_id string,
|category_code string,
|brand string,
|price string,
|user_id string)
|row format delimited fields terminated by '\t'
|""".stripMargin)/*
关键部分:
上面的数据清洗完以后,我创建了临时表rush_data,注意,当你把这个数据转换成dateframe时,
每一行数据只会变成一个字段,并且字段名叫: value ,所以我们要将其分隔开。
很好办,根据你的分隔符来使用 split 函数就能分出来,按你想要的顺序依次排
好就行了,前提是你每一行的数据长度都一样,不要会报错,下标越界嘛,懂得懂得,我上面做了清
洗,所以可以保证长度一样
*/
spark.sql("""
|select
|split(value,",")[0] event_time,
|split(value,",")[1] detail_time,
|split(value,",")[2] order_id,
|split(value,",")[3] product_id,
|split(value,",")[4] category_id,
|split(value,",")[5] category_code,
|split(value,",")[6] brand,
|split(value,",")[7] price,
|split(value,",")[8] user_id
|from rush_data
|""".stripMargin).createOrReplaceTempView("data")//将数据插入上面创建的表中,检查是否成功插入,完成
spark.sql("""
|insert into table mydb.shop
|select * from data
|""".stripMargin)spark.stop()
}
}
推荐阅读
- #|hive 表数据导出到CSV文件
- HIVE|hive 通过csv文件创建表
- 大数据|spark sql 创建rdd以及DataFrame和DataSet互转
- hadoop|Hive 安装、配置、数据导入和使用
- hive|Hive导入/导出 : 创建分区表及分区表导入csv文本文件数据
- 人工智能|屡获权威机构认 | Smartbi入选高科技高成长企业系列三大榜单
- 大数据|Flink_01_概述(个人总结)
- SQL|SQL基本功(五)--函数、谓词、CASE表达式