如何使用IDEA开发Spark|如何使用IDEA开发Spark SQL程序(一文搞懂)
目录
- 前言
- Spark SQL是什么
- 1、使用IDEA开发Spark SQL
- 1.1、指定列名添加Schema
- 1.2、通过StructType指定Schema
- 1.3、反射推断Schema–掌握
- 1.4、花式查询
- 1.5、 相互转化
- 1.6、Spark SQL完成WordCount(案例)
- 1.6.1、SQL风格
- 1.6.2、DQL风格
前言
大家好,我是DJ丶小哪吒,我又来跟你们分享知识了。对软件开发有着浓厚的兴趣。喜欢与人分享知识。做博客的目的就是为了能与 他 人知识共享。由于水平有限。博客中难免会有一些错误。如有 纰漏 之处,欢迎大家在留言区指正。小编也会及时改正。
DJ丶小哪吒又来与各位分享知识了。今天我们不飙车,今天就静静的坐下来,我们来聊一聊关于sparkSQL。准备好茶水,听老朽与你娓娓道来。
Spark SQL是什么
Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。
1、使用IDEA开发Spark SQL Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:
- 第1种:指定列名添加Schema
- 第2种:通过StructType指定Schema
- 第3种:编写样例类,利用反射机制推断Schema
1.1、指定列名添加Schema
package cn.itcast.sqlimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}object CreateDFDS {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))//3.将RDD转成DF//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换import spark.implicits._val personDF: DataFrame = rowRDD.toDF("id","name","age")personDF.show(10)personDF.printSchema()sc.stop()spark.stop()}}
1.2、通过StructType指定Schema
package cn.itcast.sqlimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.types._import org.apache.spark.sql.{DataFrame, Row, SparkSession}object CreateDFDS2 {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))//3.将RDD转成DF//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换//import spark.implicits._val schema: StructType = StructType(Seq(StructField("id", IntegerType, true),//允许为空StructField("name", StringType, true),StructField("age", IntegerType, true)))val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)personDF.show(10)personDF.printSchema()sc.stop()spark.stop()}}
1.3、反射推断Schema–掌握
package cn.itcast.sqlimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}object CreateDFDS3 {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))//3.将RDD转成DF//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换import spark.implicits._//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息//所以SparkSQL可以通过反射自动获取到并添加给DFval personDF: DataFrame = rowRDD.toDFpersonDF.show(10)personDF.printSchema()sc.stop()spark.stop()}}
1.4、花式查询
package cn.itcast.sqlimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}object QueryDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))//3.将RDD转成DF//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换import spark.implicits._//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息//所以SparkSQL可以通过反射自动获取到并添加给DFval personDF: DataFrame = rowRDD.toDFpersonDF.show(10)personDF.printSchema()//=======================SQL方式查询=======================//0.注册表personDF.createOrReplaceTempView("t_person")//1.查询所有数据spark.sql("select * from t_person").show()//2.查询age+1spark.sql("select age,age+1 from t_person").show()//3.查询age最大的两人spark.sql("select name,age from t_person order by age desc limit 2").show()//4.查询各个年龄的人数spark.sql("select age,count(*) from t_person group by age").show()//5.查询年龄大于30的spark.sql("select * from t_person where age > 30").show()//=======================DSL方式查询=======================//1.查询所有数据personDF.select("name","age")//2.查询age+1personDF.select($"name",$"age" + 1)//3.查询age最大的两人personDF.sort($"age".desc).show(2)//4.查询各个年龄的人数personDF.groupBy("age").count().show()//5.查询年龄大于30的personDF.filter($"age" > 30).show()sc.stop()spark.stop()}}
【如何使用IDEA开发Spark|如何使用IDEA开发Spark SQL程序(一文搞懂)】
1.5、 相互转化
RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:
1)使用RDD算子操作
2)使用DSL/SQL对表操作
package cn.itcast.sqlimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object TransformDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))//3.将RDD转成DF//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换import spark.implicits._//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息//所以SparkSQL可以通过反射自动获取到并添加给DF//=========================相互转换======================//1.RDD-->DFval personDF: DataFrame = personRDD.toDF//2.DF-->RDDval rdd: RDD[Row] = personDF.rdd//3.RDD-->DSval DS: Dataset[Person] = personRDD.toDS()//4.DS-->RDDval rdd2: RDD[Person] = DS.rdd//5.DF-->DSval DS2: Dataset[Person] = personDF.as[Person]//6.DS-->DFval DF: DataFrame = DS2.toDF()sc.stop()spark.stop()}}
1.6、Spark SQL完成WordCount(案例)
1.6.1、SQL风格
package cn.itcast.sqlimport org.apache.spark.SparkContextimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")//fileDF.show()//fileDS.show()//3.对每一行按照空格进行切分并压平//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是Stringimport spark.implicits._val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String//wordDS.show()/*+-----+|value|+-----+|hello||me||hello||you|...*///4.对上面的数据进行WordCountwordDS.createOrReplaceTempView("t_word")val sql ="""|select value ,count(value) as count|from t_word|group by value|order by count desc""".stripMarginspark.sql(sql).show()sc.stop()spark.stop()}}
1.6.2、DQL风格
package cn.itcast.sqlimport org.apache.spark.SparkContextimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount2 {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")//fileDF.show()//fileDS.show()//3.对每一行按照空格进行切分并压平//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是Stringimport spark.implicits._val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String//wordDS.show()/*+-----+|value|+-----+|hello||me||hello||you|...*///4.对上面的数据进行WordCountwordDS.groupBy("value").count().orderBy($"count".desc).show()sc.stop()spark.stop()}}
好了,以上内容就到这里了。你学到了吗。
到此这篇关于如何使用IDEA开发Spark SQL程序(一文搞懂)的文章就介绍到这了,更多相关IDEA开发Spark SQL内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- 考研英语阅读终极解决方案——阅读理解如何巧拿高分
- 由浅入深理解AOP
- 如何寻找情感问答App的分析切入点
- 【译】20个更有效地使用谷歌搜索的技巧
- mybatisplus如何在xml的连表查询中使用queryWrapper
- MybatisPlus|MybatisPlus LambdaQueryWrapper使用int默认值的坑及解决
- MybatisPlus使用queryWrapper如何实现复杂查询
- 如何在Mac中的文件选择框中打开系统隐藏文件夹
- 漫画初学者如何学习漫画背景的透视画法(这篇教程请收藏好了!)
- java中如何实现重建二叉树