IDEA|IDEA 开发配置SparkSQL及简单使用案例代码
1.添加依赖
在idea项目的pom.xml中添加依赖。
org.apache.spark spark-sql_2.123.0.0
2.案例代码
package com.zf.bigdata.spark.sqlimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object Spark01_SparkSql_Basic {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")//创建 SparkSession 对象val spark = SparkSession.builder().config(sparkConf).getOrCreate()// DataFrameval df: DataFrame = spark.read.json("datas/user.json")//df.show()// DataFrame => Sql//df.createOrReplaceTempView("user")//spark.sql("select * from user").show()//spark.sql("select age from user").show()//spark.sql("select avg(age) from user").show()//DataFrame => Dsl//如果涉及到转换操作,转换需要引入隐式转换规则,否则无法转换,比如使用$提取数据的值//spark 不是包名,是上下文环境对象名import spark.implicits._//df.select("age","username").show()//df.select($"age"+1).show()//df.select('age+1).show()// DataSet//val seq = Seq(1,2,3,4)//val ds: Dataset[Int] = seq.toDS()// ds.show()// RDD <=> DataFrameval rdd = spark.sparkContext.makeRDD(List((1,"张三",10),(2,"李四",20)))val df1: DataFrame = rdd.toDF("id", "name", "age")val rdd1: RDD[Row] = df1.rdd// DataFrame <=> DataSetval ds: Dataset[User] = df1.as[User]val df2: DataFrame = ds.toDF()// RDD <=> DataSetval ds1: Dataset[User] = rdd.map {case (id, name, age) => {User(id, name = name, age = age)}}.toDS()val rdd2: RDD[User] = ds1.rddspark.stop()}case class User(id:Int,name:String,age:Int)}
PS:下面看下在IDEA中开发Spark SQL程序
IDEA 中程序的打包和运行方式都和 SparkCore 类似,Maven 依赖中需要添加新的依赖项:
org.apache.spark spark-sql_2.112.1.1
一、指定Schema格式
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.IntegerTypeimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.Rowobject Demo1 {def main(args: Array[String]): Unit = {//使用Spark Session 创建表val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()//从指定地址创建RDDval personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))//通过StructType声明Schemaval schema = StructType(List(StructField("id", IntegerType),StructField("name", StringType),StructField("age", IntegerType)))//把RDD映射到rowRDDval rowRDD = personRDD.map(p=>Row(p(0).toInt,p(1),p(2).toInt))val personDF = spark.createDataFrame(rowRDD, schema)//注册表personDF.createOrReplaceTempView("t_person")//执行SQLval df = spark.sql("select * from t_person order by age desc limit 4")df.show()spark.stop()}}
二、使用case class
import org.apache.spark.sql.SparkSession//使用case classobject Demo2 {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local").appName("CaseClassDemo").getOrCreate()//从指定的文件中读取数据,生成对应的RDDval lineRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))//将RDD和case class 关联val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt))//生成 DataFrame,通过RDD 生成DF,导入隐式转换import spark.sqlContext.implicits._val studentDF = studentRDD.toDF//注册表 视图studentDF.createOrReplaceTempView("student")//执行SQLspark.sql("select * from student").show()spark.stop()}}//case class 一定放在外面case class Student(stuID:Int,stuName:String,stuAge:Int)
三、把数据保存到数据库
import org.apache.spark.sql.types.IntegerTypeimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.Rowimport java.util.Propertiesobject Demo3 {def main(args: Array[String]): Unit = {//使用Spark Session 创建表val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()//从指定地址创建RDDval personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))//通过StructType声明Schemaval schema = StructType(List(StructField("id", IntegerType),StructField("name", StringType),StructField("age", IntegerType)))//把RDD映射到rowRDDval rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt))val personDF = spark.createDataFrame(rowRDD, schema)//注册表personDF.createOrReplaceTempView("person")//执行SQLval df = spark.sql("select * from person ")//查看SqL内容//df.show()//将结果保存到mysql中val props = new Properties()props.setProperty("user", "root")props.setProperty("password", "123456")props.setProperty("driver", "com.mysql.jdbc.Driver")df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)spark.close()}}
【IDEA|IDEA 开发配置SparkSQL及简单使用案例代码】以上内容转自:到此这篇关于IDEA 开发配置SparkSQL及简单使用案例代码的文章就介绍到这了,更多相关IDEA 开发 SparkSQL内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
https://blog.csdn.net/weixin_43520450/article/details/106093582
作者:故明所以
推荐阅读
- 深入理解Go之generate
- vue-cli|vue-cli 3.x vue.config.js 配置
- 标签、语法规范、内联框架、超链接、CSS的编写位置、CSS语法、开发工具、块和内联、常用选择器、后代元素选择器、伪类、伪元素。
- 从战略性的角度可以配置股票
- IDEA|IDEA 创建工程
- 15、IDEA学习系列之其他设置(生成javadoc、缓存和索引的清理等)
- 我的软件测试开发工程师书单
- echart|echart 双轴图开发
- NPDP拆书(三)(新产品开发战略(经营与创新战略))
- 缓存有关的配置和属性