SparkSQL|SparkSql

一、Shark 1、简介

Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎

2、优点:
1)由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用 2)Shark是完全兼容Hive的语法,表结构以及UDF函数等,已有的HiveSql可以直接进行迁移至Shark上Shark底层依赖于Hive的解析器,查询优化器

3、缺点
由于SHark的整体设计架构对Hive的依赖性太强,难以支持其长远发展,比如不能和Spark的其他组件进行很好的集成,无法满足Spark的一栈式解决大数据处理的需求。

二、SparkSQL 1、介绍
1)Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。 2)SparkSQL支持查询原生的RDD。 3)能够在scala中写SQL语句。

2、Spark on Hive和Hive on Spark
**Spark on Hive**: Hive只作为储存角色,Spark负责sql解析优化,执行。 **Hive on Spark**:Hive即作为存储又负责sql的解析优化,Spark负责执行。

3、Dataset与DataFrame
前期:Dataset简单理解为一张表![在这里插入图片描述](https://img-blog.csdnimg.cn/20190418193853479.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5ncm9uZ2xpbjE=,size_16,color_FFFFFF,t_70) Dataset的底层封装的是RDD,当RDD的泛型是Row类型的时候,我们也可以称它为DataFrame。即Dataset = DataFrame(Dataset与DataFrame的使用请见第三章、创建Dataset的几种方式)

4、SparkSQL的数据源
SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/2019041819453259.png)

5、SparkSQL底层架构
![在这里插入图片描述](https://img-blog.csdnimg.cn/2019041819462122.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5ncm9uZ2xpbjE=,size_16,color_FFFFFF,t_70) 首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划,随后经过消费模型转换成一个个的Spark任务执行。

6、谓词下推(predicate Pushdown)
![在这里插入图片描述](https://img-blog.csdnimg.cn/20190418194749816.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5ncm9uZ2xpbjE=,size_16,color_FFFFFF,t_70)

三、3. 创建Dataset的几种方式 1、读取json格式的文件创建DataSet 注意:json文件中不能嵌套json格式的内容
1.读取json格式两种方式
2.ds.show默认显示前20行,使用ds.show(行数)显示多行
3.ds.javaRDD/(scala ds.rdd) 将DataFrame转换成RDD
4.ds.printSchema()显示Dataset中的Schema信息
5.dataset自带的API 操作Dataset ,用的少
6.想使用sql查询,首先要将Dataset注册成临时表:dd.registerTempTable(“jtable”),再使用sql,怎么使用sql? sqlContext.sql(“sql语句”)
7.不能读取嵌套的json文件
8.ds加载过来之后将列按照ascii排序了
package com.shsxt.java.sql.dataset; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * 读取json格式的文件创建DataSet * * 注意 :json文件中不能嵌套json格式的内容 * * 1.读取json格式两种方式 * 2.ds.show默认显示前20行,使用ds.show(行数)显示多行 * 3.ds.javaRDD/(scala ds.rdd) 将DataFrame转换成RDD * 4.ds.printSchema()显示Dataset中的Schema信息 * 5.dataset自带的API 操作Dataset ,用的少 * 6.想使用sql查询,首先要将Dataset注册成临时表:dd.registerTempTable("jtable"),再使用sql,怎么使用sql?sqlContext.sql("sql语句") * 7.不能读取嵌套的json文件 * 8.ds加载过来之后将列按照ascii排序了 * * @author root */ public class CreateDSFromJosonFile {public static void main(String[] args) {SparkSession sparkSession = SparkSession .builder() .appName("jsonfile") .master("local") .getOrCreate(); /** * Dataset的底层是一个一个的RDD * 注意: *当 Dataset里的泛型是Row时,我们又可以称之为dataframe * * 以下两种方式都可以读取json格式的文件 * */Dataset ds = sparkSession.read().format("json").load("data/json"); //Dataset ds = sqlContext.read().json("data/json"); //ds.show(); ds.show(); /** * 显示DataSet中的内容,默认显示前20行。如果现实多行要指定多少行show(行数) * 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。 */ ds.show(100); /** *DataSet转换成RDD */ //JavaRDD javaRDD = ds.javaRDD(); /** * 树形的形式显示schema信息 */ ds.printSchema(); /** * dataset自带的API 操作dataset */ //select name from table ds.select("name").show(); //select name ,age+10 as addage from table ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show(); //select name ,age from table where age>19 ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show(); //select age,count(*) from table group by age ds.groupBy(ds.col("age")).count().show(); /** * 将dataset注册成临时的一张表,这张表相当于临时注册到内存中,逻是辑上的表,不会雾化到磁盘 */ds.createOrReplaceTempView("jtable"); //ds.registerTempTable("jtable"); Dataset sql = sparkSession.sql("select age,count(*) as gg from jtable group by age"); sql.show(); // //Dataset sql2 = sparkSession.sql("select name,age from jtable"); //sql2.show(); sparkSession.stop(); } }

2、读取json格式的RDD创建Dataset
package com.shsxt.java.sql.dataset; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import java.util.Arrays; /** * 读取json格式的RDD创建DF * @author root * */ public class CreateDSFromJsonRDD { public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("jsonrdd") .master("local") .getOrCreate(); /** * 注意: *1.由于是java版,故通过javaSparkcontext.parallelize来创建json格式的JavaRDD *所以我们通过sparkcontext来创建javaSparkcontext *2.如果是scala版本,直接通过sparkcontext.parallelize来创建,就无需创建javaSparkcontext */SparkContext sc = sparkSession.sparkContext(); JavaSparkContext jsc = new JavaSparkContext(sc); JavaRDD nameRDD = jsc.parallelize(Arrays.asList( "{'name':'zhangsan','age':\"18\"}", "{\"name\":\"lisi\",\"age\":\"19\"}", "{\"name\":\"wangwu\",\"age\":\"20\"}" )); JavaRDD scoreRDD = jsc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"score\":\"100\"}", "{\"name\":\"lisi\",\"score\":\"200\"}", "{\"name\":\"wangwu\",\"score\":\"300\"}" )); Dataset nameds = sparkSession.read().json(nameRDD); Dataset scoreds = sparkSession.read().json(scoreRDD); //注册成临时表使用 nameds.createOrReplaceTempView("nameTable"); scoreds.createOrReplaceTempView("scoreTable"); Dataset result = sparkSession.sql("select nameTable.name,nameTable.age,scoreTable.score " + "from nameTable join scoreTable " + "on nameTable.name = scoreTable.name"); result.show(); sc.stop(); } }

3、通过反射的方式将非json格式的RDD转换成Dataset 注意:
1.这种方式不推荐使用 2.自定义类要实现序列化接口 3.自定义类访问级别必须是Public 4.RDD转成Dataset会把自定义类中字段的名称按assci码排序

package com.shsxt.java.sql.dataset; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * 通过反射的方式将非json格式的RDD转换成Dataset * 注意:这种方式不推荐使用 * * @author root */ public class CreateDSFromRDDWithReflect { public static void main(String[] args) { /** * 注意: * 1.自定义类要实现序列化接口 * 2.自定义类访问级别必须是Public * 3.RDD转成Dataset会把自定义类中字段的名称按assci码排序 */SparkSession sparkSession = SparkSession .builder() .appName("reflect") .master("local") .getOrCreate(); SparkContext sc = sparkSession.sparkContext(); JavaSparkContext jsc = new JavaSparkContext(sc); JavaRDD lineRDD = jsc.textFile("data/person.txt"); JavaRDD personRDD = lineRDD.map( new Function() {private static final long serialVersionUID = 1L; @Override public Person call(String line) throws Exception { Person p = new Person(); p.setId(line.split(",")[0]); p.setName(line.split(",")[1]); p.setAge(Integer.valueOf(line.split(",")[2])); return p; } }); /** * 传入进去Person.class的时候,sqlContext是通过反射的方式创建Dataset * 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了Dataset */ Dataset dataFrame = sparkSession.createDataFrame(personRDD, Person.class); dataFrame.show(); dataFrame.printSchema(); dataFrame.registerTempTable("person"); Dataset sql = sparkSession.sql("selectname,id,age from person where id = 2"); sql.show(); /** * 将Dataset转成JavaRDD * 注意: * 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用 * 2.可以使用row.getAs("列名")来获取对应的列值。 * //*/ JavaRDD javaRDD = dataFrame.javaRDD(); JavaRDD map = javaRDD.map(new Function() {/** * */ private static final long serialVersionUID = 1L; @Override public Person call(Row row) throws Exception { Person p = new Person(); //p.setId(row.getString(1)); //p.setName(row.getString(2)); //p.setAge(row.getInt(0)); p.setId(row.getAs("id")); p.setName(row.getAs("name")); p.setAge(row.getAs("age")); return p; } }); map.foreach(x-> System.out.println(x)); sparkSession.stop(); } }

4、动态创建Schema将非json格式RDD转换成Dataset
/** * */ package com.shsxt.java.sql.dataset; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.Arrays; import java.util.List; /** * 动态创建Schema将非json格式RDD转换成Dataset * @author root * */ public class CreateDSFromRDDWithStruct { public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("struct") .master("local") .getOrCreate(); SparkContext sc = sparkSession.sparkContext(); JavaSparkContext jsc = new JavaSparkContext(sc); JavaRDD lineRDD = jsc.textFile("data/person.txt"); /** * 转换成Row类型的RDD */ final JavaRDD rowRDD = lineRDD.map(new Function() {/** * */ private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception {return RowFactory.create( s.split(",")[0], s.split(",")[1], Integer.valueOf(s.split(",")[2]) ); } }); /** * 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库 */List asList = Arrays.asList( DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true) ); StructType schema = DataTypes.createStructType(asList); Dataset df = sparkSession.createDataFrame(rowRDD, schema); df.printSchema(); df.show(); sc.stop(); } }

5、读取JDBC中的数据创建Dataset(MySql为例)
第一种方式读取MySql数据库表,加载为Dataset 第二种方式读取MySql数据表加载为Dataset

package com.shsxt.java.sql.dataset; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.*; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class CreateDSFromMysql { public static void main(String[] args) {SparkSession sparkSession = SparkSession .builder() .appName("mysql") .master("local") .getOrCreate(); /** * 第一种方式读取MySql数据库表,加载为Dataset */ Map options = new HashMap(); options.put("url", "jdbc:mysql://127.0.0.1:3306/spark"); options.put("driver", "com.mysql.jdbc.Driver"); options.put("user", "root"); options.put("password", "root"); options.put("dbtable", "person"); Dataset person = sparkSession.read().format("jdbc").options(options).load(); person.show(); person.createOrReplaceTempView("person"); //person.registerTempTable("person"); /** * 第二种方式读取MySql数据表加载为Dataset */DataFrameReader reader = sparkSession.read().format("jdbc"); reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user", "root"); reader.option("password", "root"); reader.option("dbtable", "score"); Dataset score = reader.load(); score.show(); score.createOrReplaceTempView("score"); //score.registerTempTable("score"); Datasetresult = sparkSession.sql("select person.id,person.name,person.age,score.score " + "from person,score " + "where person.name = score.nameand score.score> 82"); result.show(); // result.registerTempTable("result"); Datasetdf = sparkSession.sql("select id,name,age,score from result where age > 18"); df.show(); /** * 将Dataset结果保存到Mysql中 */Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "root"); /** * SaveMode: * Overwrite:覆盖 * Append:追加 * ErrorIfExists:如果存在就报错 * Ignore:如果存在就忽略 * */result.write().mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties); System.out.println("----Finish----"); sparkSession.stop(); } }

6、读取parquet文件创建Dataset
注意:
可以将Dataset存储成parquet文件。保存成parquet文件的方式有两种
df.write().mode(SaveMode.Overwrite)format("parquet") .save("./sparksql/parquet"); df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");

SaveMode指定文件保存时的模式。
Overwrite:覆盖
Append:追加
ErrorIfExists:如果存在就报错
Ignore:如果存在就忽略
package com.shsxt.java.sql.dataset; import org.apache.spark.sql.*; public class CreateDFFromParquet { public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("parquet") .master("local") .getOrCreate(); Datasetdf = sparkSession.read().json("data/json"); //sparkSession.read().format("json").load("./spark/json"); df.show(); /** * 将Dataset保存成parquet文件, * SaveMode指定存储文件时的保存模式: *Overwrite:覆盖 *Append:追加 *ErrorIfExists:如果存在就报错 *Ignore:如果存在就忽略 * 保存成parquet文件有以下两种方式: */df.write().mode(SaveMode.Overwrite).format("parquet").save("data/parquet"); //df.write().mode(SaveMode.Overwrite).parquet("data/parquet"); /** * 加载parquet文件成Dataset * 加载parquet文件有以下两种方式: */Dataset load = sparkSession.read().format("parquet").load("data/parquet"); load = sparkSession.read().parquet("data/parquet"); load.show(); sparkSession.stop(); } }

7、通过Bean class来创建 Dataset
/** * */ package com.shsxt.java.sql.dataset; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.Arrays; import java.util.List; /** * 通过Bean class来创建 Dataset * * @author root */ public class CreateDSFromBeanClass {public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("beanclass") .master("local") .getOrCreate(); Person person = new Person(); person.setId("1"); person.setAge(18); person.setName("zs"); Person person2 = new Person(); person2.setId("2"); person2.setAge(20); person2.setName("ls"); List people = Arrays.asList(person, person2); Encoder personEncoder = Encoders.bean(Person.class); Dataset dataset = sparkSession.createDataset(people, personEncoder); dataset.printSchema(); dataset.show(); dataset.registerTempTable("person"); Dataset result = sparkSession.sql("select name , idfrom person"); result.show(); sparkSession.stop(); } }

8、读取Hive中的数据加载成Dataset
package com.shsxt.java.sql.dataset; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class CreateDSFromHive { public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("hvie") //开启hive的支持,接下来就可以操作hive表了 // 前提需要是需要开启hive metastore 服务 .enableHiveSupport() .getOrCreate(); sparkSession.sql("USE spark"); sparkSession.sql("DROP TABLE IF EXISTS student_infos"); //在hive中创建student_infos表 sparkSession.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' "); sparkSession.sql("load data local inpath '/root/student_infos' into table student_infos"); //注意:此种方式,程序需要能读取到数据(如/root/student_infos),同时也要能读取到 metastore服务的配置信息。sparkSession.sql("DROP TABLE IF EXISTS student_scores"); sparkSession.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'"); sparkSession.sql("LOAD DATA " + "LOCAL INPATH '/root/student_scores'" + "INTO TABLE student_scores"); //Dataset df = hiveContext.table("student_infos"); //读取Hive表加载Dataset方式 /** * 查询表生成Dataset */ Dataset goodStudentsDF = sparkSession.sql("SELECT si.name, si.age, ss.score " + "FROM student_infos si " + "JOIN student_scores ss " + "ON si.name=ss.name " + "WHERE ss.score>=80"); goodStudentsDF.registerTempTable("goodstudent"); Datasetresult = sparkSession.sql("select * from goodstudent"); result.show(); /** * 将结果保存到hive表 good_student_infos */ sparkSession.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos"); sparkSession.stop(); } }

四、Spark On Hive的配置 1、在Spark客户端配置spark On hive
在Spark客户端安装包下spark-2.2.1/conf中创建文件hive-site.xml: 配置hive的metastore路径 hive.metastore.uris thrift://node1:9083

2、启动Hive的metastore服务
hive --service metastore &

3、启动zookeeper集群,启动HDFS集群。 4、启动SparkShell 读取Hive中的表总数,对比hive中查询同一表查询总数测试时间。
./spark-shell --master spark://node1:7077 spark.sql("select * from day_table").show;

注意:
如果使用Spark on Hive 查询数据时,出现错误:
SparkSQL|SparkSql
文章图片

找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径:
SparkSQL|SparkSql
文章图片

五、序列化问题 java中以下几种情况下不被序列化 1.反序列化时serializable 版本号不一致时会导致不能反序列化。
步骤:
1.将本地bean serialVersionUID 版本号改为1L,然后序列化
2.将本地bean serialVersionUID 版本号改为2L,然后反序列化
3.观察能否完成反序列化.
结论:简单来说,Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,
JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的, 可以进行反序列化,否则就会出现序列化版本不一致的异常。当实现java.io.Serializable接口的实体(类)没有显式地定义一个名为serialVersionUID,类型为long的变量时,Java序列化机制会根据编译的class自动生成一个serialVersionUID作序列化版本比较用,这种情况下,只有同一次编译生成的class才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本,即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID,类型为long的变量,不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。
package com.shsxt.java.sql.serializeTest; import com.shsxt.java.sql.serializeTest.bean.User; import java.io.FileInputStream; import java.io.ObjectInputStream; /** * 测试:反序列化时 实体类中的serialVersionUID改变后能否反序列化?(不能) * * 步骤:1.将本地bean serialVersionUID 版本号改为1L,然后序列化 * 2.将本地bean serialVersionUID 版本号改为2L,然后反序列化 * 3.观察能否完成反序列化. * 结论:简单来说,Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时, * JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的, * 可以进行反序列化,否则就会出现序列化版本不一致的异常。 * 当实现java.io.Serializable接口的实体(类)没有显式地定义一个名为serialVersionUID,类型为long的变量时, * Java序列化机制会根据编译的class自动生成一个serialVersionUID作序列化版本比较用,这种情况下, * 只有同一次编译生成的class才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本, * 即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID,类型为long的变量, * 不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。 * * @author root */public class DeSerializableTest { public static void main(String[] args) { //User user = new User(); //user.setUsername("zhangsan"); //user.setPasswd("1234"); try {/** * 这里注意第一次运行的时候要把下面代码中的反序列部分注释,然后改正User对象中的serializableVersion的版本号, * 运行反序列化时,要把序列化代码注释,这样才能保证版本不一致。 *///ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("data/user.txt")); //out.writeObject(user); //out.flush(); //out.close(); ObjectInputStream in = new ObjectInputStream(new FileInputStream("data/user.txt")); User user = (User) in.readObject(); System.out.println(user); System.out.println(user.getPasswd() + "" + user.getUsername()); in.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

2.子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。
**注意**:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化

子类实现serializable 父类不实现serializable,当序列化子类时,反序列化得到父类中的某变量的数值时,该变量的数值与序列化时的数值不相同?(不相同)
步骤:
1.父类不实现serializable,子类实现serializable,父类中比子类中多一个common变量。
2.将子类序列化,然后反序列化,观察能否得到,设置父类中的变量common
3.将父类实现serializable,同样操作第二步骤,观察反序列化能否得到父类中的common值
结论:
一个子类实现了 Serializable 接口,它的父类都没有实现 Serializable 接口,序列化该子类对象,然后反序列化后输出父类定义的某变量的数值, 该变量数值与序列化时的数值不同。(需要在父类中是实现默认的构造方法,否则会报异常:no validconstructor)在父类没有实现 Serializable 接口时,虚拟机是不会序列化父对象的,而一个 Java 对象的构造必须先有父对象,才有子对象,反序列化也不例外。所以反序列化时,为了构造父对象,只能调用父类的无参构造函数作为默认的父对象。因此当我们取父对象的变量值时,它的值是调用父类无参构造函数后的值。 如果你考虑到这种序列化的情况,在父类无参构造函数中对变量进行初始化,否则的话,父类变量值都是默认声明的值,如 int 型的默认是 0,string 型的默认是 null。
应用场景:
根据父类对象序列化的规则,我们可以将不需要被序列化的字段抽取出来放到父类中,子类实现 Serializable 接口, 父类不实现,根据父类序列化规则,父类的字段数据将不被序列化,
package com.shsxt.java.sql.serializeTest; import com.shsxt.java.sql.serializeTest.bean.Apple; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; /** * 测试:子类实现serializable 父类不实现serializable,当序列化子类时,反序列化得到父类中的某变量的数值时,该变量的数值与序列化时的数值不相同?(不相同) * * 步骤:1.父类不实现serializable,子类实现serializable,父类中比子类中多一个common变量。 * 2.将子类序列化,然后反序列化,观察能否得到,设置父类中的变量common * 3.将父类实现serializable,同样操作第二步骤,观察反序列化能否得到父类中的common值 * * 结论:一个子类实现了 Serializable 接口,它的父类都没有实现 Serializable 接口,序列化该子类对象,然后反序列化后输出父类定义的某变量的数值, * 该变量数值与序列化时的数值不同。(需要在父类中是实现默认的构造方法,否则会报异常:no validconstructor) * 在父类没有实现 Serializable 接口时,虚拟机是不会序列化父对象的,而一个 Java 对象的构造必须先有父对象,才有子对象,反序列化也不例外。 * 所以反序列化时,为了构造父对象,只能调用父类的无参构造函数作为默认的父对象。因此当我们取父对象的变量值时,它的值是调用父类无参构造函数后的值。 * 如果你考虑到这种序列化的情况,在父类无参构造函数中对变量进行初始化,否则的话,父类变量值都是默认声明的值,如 int 型的默认是 0,string 型的默认是 null。 * * 应用场景: * 根据父类对象序列化的规则,我们可以将不需要被序列化的字段抽取出来放到父类中,子类实现 Serializable 接口, * 父类不实现,根据父类序列化规则,父类的字段数据将不被序列化, *//** * 测试子类实现serializable接口,父类不实现serializable接口,父类中的变量不能 被序列化。 * * 步骤: * 1.子类(Apple)实现serializable接口,父类(Fruit)不实现serializable接口。 * 2.运行以下代码,观察结果。 * * @author root */public class ExtendsSerializableTest { public static void main(String[] args) { //Apple apple = new Apple(); //apple.setCommon("in apple"); //apple.setColor("red"); //apple.setName("apple"); try { //ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("data/apple.txt")); //out.writeObject(apple); //out.flush(); //out.close(); //System.out.println("before serializable : "); //System.out.println("apple common : " + apple.common); //System.out.println("apple name: " + apple.name); //System.out.println("apple color : "+ apple.color); //System.out.println("\n-----------------------\n"); // ObjectInputStream in = new ObjectInputStream(new FileInputStream("data/apple.txt")); Apple apple = (Apple) in.readObject(); System.out.println("after serializable :"); System.out.println("apple common : " + apple.common); System.out.println("apple name : " + apple.name); System.out.println("apple color : " + apple.color); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } }}

3.被关键字transient修饰的变量不能被序列化。
package com.shsxt.java.sql.serializeTest.bean; import java.io.Serializable; public class User implements Serializable { /** * */ private static final long serialVersionUID =2L; /** * */ private transient String username; private String passwd; //private String passwd; public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getPasswd() { return passwd; }public void setPasswd(String passwd) { this.passwd = passwd; } @Override public String toString() { return "User [username=" + username + ", passwd=" + passwd + "]"; } }

4.静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。
结论:
可以看到当改变静态变量的值时,序列化后的静态变量也是改变的,如果正常能被序列化的值,是在反序列化中得到的值是不会变的,可见,静态变量不能被序列化。
那为什么反序列化可以读到值呢?
一个静态变量不管是否被transient修饰,均不能被序列化,反序列化后类中static型变量isPerson的值为当前JVM中对应static变量的值,这个值是JVM中的,不是反序列化得出的
package com.shsxt.java.sql.serializeTest; import com.shsxt.java.sql.serializeTest.bean.Person; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; /** * 问题:静态变量能否被序列化?(不能被序列化) * 【SparkSQL|SparkSql】 * 结论:可以看到当改变静态变量的值时,序列化后的静态变量也是改变的,如果正常能被序列化的值,是在反序列化中得到的值是不会变的,可见,静态变量不能被序列化。 * 那为什么反序列化可以读到值呢? * 一个静态变量不管是否被transient修饰,均不能被序列化,反序列化后类中static型变量isPerson的值为当前JVM中对应static变量的值, * 这个值是JVM中的,不是反序列化得出的 * * @author root */ public class StaticSerializableTest {public static void main(String[] args) { //Person person = new Person(); //Person.isPerson = "yes"; // //person.setName("zhangsan"); //person.setAge("18"); //System.out.println("before serializable : "); //System.out.println("isPerson : " + Person.isPerson); //System.out.println("name : " + person.name); //System.out.println("age : " + person.age); try { //ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("data/personSerialize.txt")); //out.writeObject(person); //out.flush(); //out.close(); //System.out.println("\n=================\n"); // ObjectInputStream in = new ObjectInputStream(new FileInputStream("data/personSerialize.txt")); Person person = (Person) in.readObject(); in.close(); System.out.println("after serializable : "); System.out.println("isPerson : " + person.isPerson); System.out.println("name : " + person.name); System.out.println("age : " + person.age); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); }} }

六、储存DataSet
  1. 将DataSet存储为parquet文件。
  2. 将DataSet存储到JDBC数据库。
  3. 将DataSet存储到Hive表。
七、自定义函数UDF和UDAF UDAF 用户自定义聚合函数
package com.shsxt.java.sql.udf_udaf; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * UDAF 用户自定义聚合函数 * * @author root */ public class UDAF { public static void main(String[] args) {SparkSession sparkSession = SparkSession .builder() .appName("udaf") .master("local") .getOrCreate(); JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD parallelize = sc.parallelize( Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi","zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi"),2); JavaRDD rowRDD = parallelize.map(new Function() {/** * */ private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception {return RowFactory.create(s); } }); List fields = new ArrayList(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset df = sparkSession.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 注册一个UDAF函数,实现统计相同值得个数 * 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的 * 数据: *zhangsan *zhangsan *lisi *lisi * *select count(*)from user group by name */sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {/** * */ private static final long serialVersionUID = 1L; /** * 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果 */ @Override public void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0); System.out.println("init ....." + buffer.get(0)); }/** * 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑 * buffer.getInt(0)获取的是上一次聚合后的值 * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 * 大聚和发生在reduce端. * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算 */ @Override public void update(MutableAggregationBuffer buffer, Row arg1) {buffer.update(0, buffer.getInt(0) + 1); System.out.println("update.....buffer" + buffer.toString() + " | row" + arg1.toString() ); }/** * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理 * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来 * buffer1.getInt(0) : 大聚合的时候 上一次聚合后的值 * buffer2.getInt(0) : 这次计算传入进来的update的结果 * 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作 */public void merge(MutableAggregationBuffer buffer1, Row arg1) { // 2 34567 // 0 + 2 = 2 // 2 + 3 = 5 // 5 + 4= 9buffer1.update(0, buffer1.getInt(0) + arg1.getInt(0)); System.out.println("merge.....buffer : " + buffer1.toString() + "| row" + arg1.toString() ); }/** * 在进行聚合操作的时候所要处理的数据的结果的类型 */ @Override public StructType bufferSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true))); }/** * 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果 */ @Override public Object evaluate(Row row) { return row.getInt(0); }/** * 指定UDAF函数计算后返回的结果类型 */ @Override public DataType dataType() { return DataTypes.IntegerType; }/** * 指定输入字段的字段及类型 */ @Override public StructType inputSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true))); }/** * 确保一致性 一般用true,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。 */ @Override public boolean deterministic() { return true; }}); sparkSession.sql("select name ,StringCount(name) as number from user group by name").show(); sc.stop(); } }

UDF 用户自定义函数
package com.shsxt.java.sql.udf_udaf; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * UDF 用户自定义函数 * @author root * */ public class UDF { public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("udf") .master("local") .getOrCreate(); JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu")); JavaRDD rowRDD = parallelize.map(new Function() {/** * */ private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); /** * 动态创建Schema方式加载DF */ List fields = new ArrayList(); fields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); StructType schema = DataTypes.createStructType(fields); Dataset df = sparkSession.createDataFrame(rowRDD,schema); df.registerTempTable("user"); /** * 根据UDF函数参数的个数来决定是实现哪一个UDFUDF1,UDF2。。。。UDF1xxx */sparkSession.udf().register("StrLen",new UDF2() {/** * */ private static final long serialVersionUID = 1L; @Override public Integer call(String t1, Integer t2) throws Exception { return t1.length() + t2; } } ,DataTypes.IntegerType ); sparkSession.sql("select name ,StrLen(name,100) as length from user").show(); // sparkSession.stop(); } }

八、开窗函数
row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于 分组取topN

开窗函数格式:
row_number() over (partitin by XXX order by XXX)

package com.shsxt.java.sql.windowfun; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * row_number()开窗函数: * 主要是按照某个字段分组,然后取另一字段的前几个的值,相当于 分组取topN * row_number() over (partition by xxx order by xxx desc) xxx * @author root * */ public class RowNumberWindowFun { //-Xms800m -Xmx800m-XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("window") .master("local") //开启hive的支持,接下来就可以操作hive表了 // 前提需要是需要开启hive metastore 服务 .enableHiveSupport() .getOrCreate(); sparkSession.sql("use spark"); sparkSession.sql("drop table if exists sales"); sparkSession.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'"); sparkSession.sql("load data local inpath './data/sales.txt' into table sales"); /** * 开窗函数格式: * 【 row_number() over (partition by XXX order by XXX) as rank】 * 注意:rank 从1开始 */ /** * 以类别分组,按每种类别金额降序排序,显示 【日期,种类,金额】 结果,如: * * 1 A 100 * 2 B 200 * 3 A 300 * 4 B 400 * 5 A 500 * 6 B 600 * 排序后: * 5 A 500--rank 1 * 3 A 300--rank 2 * 1 A 100--rank 3 * 6 B 600--rank 1 * 4 B 400 --rank 2 * 2 B 200--rank 3 * * 2018 A 4001 * 2017 A 5002 * 2016 A 5503 * * * 2016 A 5501 * 2017 A 5002 * 2018 A 4003 * */Dataset result = sparkSession.sql("select riqi,leibie,jine,rank " + "from (" + "select riqi,leibie,jine," + "row_number() over (partition by leibie order by jine desc) rank " + "from sales) t " + "where t.rank<=3"); result.show(100); /** * 将结果保存到hive表sales_result */ //result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result"); sparkSession.stop(); } }

    推荐阅读