1,读数据库数据实例
def getDataForDb() : DataFrame={//获取sparkSession
val spark: SparkSession = SparkSession.builder().master("local").appName("getData")
.config("spark.sql.shuffle.partitions", 1).getOrCreate()val properties: Properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
properties.setProperty("driver", "com.mysql.jdbc.Driver")valwinds = spark.read.jdbc("jdbc:mysql://localhost:3306/weather?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8", "(select wind_speed from weather) T", properties)return winds
}
【spark|spark案例-词频统计(存储数据库)】2.存入数据库
def updateForDb(rdd : RDD[(String, Int)]) : Boolean ={
//存入RDD的每一条数据
rdd.foreachPartition(
it => {
var url = "jdbc:mysql://localhost:3306/weather?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8"
val conn = DriverManager.getConnection(url, "root", "123456")
//"insert into information(`nowtime`,`data`) values(current_time,'A');
");
val pstat = conn.prepareStatement("INSERT INTO wind(`speed`,`count`) VALUES(?,?)")
for (obj <- it) {
pstat.setString(1, obj._1)
pstat.setInt(2, obj._2)
pstat.addBatch
}
try {
pstat.executeBatch
} finally {
pstat.close
conn.close
}
}
)
return true
}
3.处理
object Count {
def main(args: Array[String]) {
//从数据库获取数据
val wind_dataFrame = new GetDb().getDataForDb()
//将dataframe转为rdd结果并将row转为string
val wind_rdd = wind_dataFrame.rdd.map(_.mkString(","))
// 将wind_speed分组,聚合
val result: RDD[(String, Int)] = wind_rdd.map((_, 1)).reduceByKey(_ + _)
// 排序 降序
val finalRes: RDD[(String, Int)] = result.sortBy(_._2, false)
//更新到数据库
new GetDb().updateForDb(finalRes)
}
4.pom一览
org.apache.spark
spark-core_2.12
2.4.4
org.apache.spark
spark-sql_2.12
2.4.4
mysql
mysql-connector-java
8.0.21
org.apache.spark
spark-streaming_2.12
2.4.4
推荐阅读
- Spark基础学习笔记|Spark RDD案例(词频统计)
- Spark,一个奇迹的诞生
- Spark入门简介
- 批处理框架|Spark学习笔记(3) - 关于Spark常用的transform算子的一些总结??????
- 算法|使用Spark完成基于TF-IDF特征的新闻热点聚类
- #|Spark性能调优实战(基础知识)-极客时间-吴磊
- 数据库|Java 实验四 JDBC数据库编程 My SQL
- Spark|Scala基础(1) 基本语法
- jdbc|JDBC连接数据库步骤,增删改查,从初级到封装