spark|spark案例-词频统计(存储数据库)

1,读数据库数据实例

def getDataForDb() : DataFrame={//获取sparkSession val spark: SparkSession = SparkSession.builder().master("local").appName("getData") .config("spark.sql.shuffle.partitions", 1).getOrCreate()val properties: Properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "123456") properties.setProperty("driver", "com.mysql.jdbc.Driver")valwinds = spark.read.jdbc("jdbc:mysql://localhost:3306/weather?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8", "(select wind_speed from weather) T", properties)return winds }

【spark|spark案例-词频统计(存储数据库)】2.存入数据库
def updateForDb(rdd : RDD[(String, Int)]) : Boolean ={ //存入RDD的每一条数据 rdd.foreachPartition( it => { var url = "jdbc:mysql://localhost:3306/weather?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8" val conn = DriverManager.getConnection(url, "root", "123456") //"insert into information(`nowtime`,`data`) values(current_time,'A'); "); val pstat = conn.prepareStatement("INSERT INTO wind(`speed`,`count`) VALUES(?,?)") for (obj <- it) { pstat.setString(1, obj._1) pstat.setInt(2, obj._2) pstat.addBatch } try { pstat.executeBatch } finally { pstat.close conn.close } } ) return true }

3.处理
object Count { def main(args: Array[String]) { //从数据库获取数据 val wind_dataFrame = new GetDb().getDataForDb() //将dataframe转为rdd结果并将row转为string val wind_rdd = wind_dataFrame.rdd.map(_.mkString(",")) // 将wind_speed分组,聚合 val result: RDD[(String, Int)] = wind_rdd.map((_, 1)).reduceByKey(_ + _) // 排序 降序 val finalRes: RDD[(String, Int)] = result.sortBy(_._2, false) //更新到数据库 new GetDb().updateForDb(finalRes) }

4.pom一览
org.apache.spark spark-core_2.12 2.4.4 org.apache.spark spark-sql_2.12 2.4.4 mysql mysql-connector-java 8.0.21 org.apache.spark spark-streaming_2.12 2.4.4

    推荐阅读