一、Shark 1、简介
Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎
2、优点:
1)由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用
2)Shark是完全兼容Hive的语法,表结构以及UDF函数等,已有的HiveSql可以直接进行迁移至Shark上Shark底层依赖于Hive的解析器,查询优化器
3、缺点
由于SHark的整体设计架构对Hive的依赖性太强,难以支持其长远发展,比如不能和Spark的其他组件进行很好的集成,无法满足Spark的一栈式解决大数据处理的需求。
二、SparkSQL 1、介绍
1)Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。
2)SparkSQL支持查询原生的RDD。
3)能够在scala中写SQL语句。
2、Spark on Hive和Hive on Spark
**Spark on Hive**: Hive只作为储存角色,Spark负责sql解析优化,执行。
**Hive on Spark**:Hive即作为存储又负责sql的解析优化,Spark负责执行。
3、Dataset与DataFrame
前期:Dataset简单理解为一张表![在这里插入图片描述](https://img-blog.csdnimg.cn/20190418193853479.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5ncm9uZ2xpbjE=,size_16,color_FFFFFF,t_70)
Dataset的底层封装的是RDD,当RDD的泛型是Row类型的时候,我们也可以称它为DataFrame。即Dataset = DataFrame(Dataset与DataFrame的使用请见第三章、创建Dataset的几种方式)
4、SparkSQL的数据源
SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。
![在这里插入图片描述](https://img-blog.csdnimg.cn/2019041819453259.png)
5、SparkSQL底层架构
![在这里插入图片描述](https://img-blog.csdnimg.cn/2019041819462122.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5ncm9uZ2xpbjE=,size_16,color_FFFFFF,t_70)
首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划,随后经过消费模型转换成一个个的Spark任务执行。
6、谓词下推(predicate Pushdown)
![在这里插入图片描述](https://img-blog.csdnimg.cn/20190418194749816.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5ncm9uZ2xpbjE=,size_16,color_FFFFFF,t_70)
三、3. 创建Dataset的几种方式 1、读取json格式的文件创建DataSet 注意:json文件中不能嵌套json格式的内容
1.读取json格式两种方式
2.ds.show默认显示前20行,使用ds.show(行数)显示多行
3.ds.javaRDD/(scala ds.rdd) 将DataFrame转换成RDD
4.ds.printSchema()显示Dataset中的Schema信息
5.dataset自带的API 操作Dataset ,用的少
6.想使用sql查询,首先要将Dataset注册成临时表:dd.registerTempTable(“jtable”),再使用sql,怎么使用sql? sqlContext.sql(“sql语句”)
7.不能读取嵌套的json文件
8.ds加载过来之后将列按照ascii排序了
package com.shsxt.java.sql.dataset;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 读取json格式的文件创建DataSet
* * 注意 :json文件中不能嵌套json格式的内容
* * 1.读取json格式两种方式
* 2.ds.show默认显示前20行,使用ds.show(行数)显示多行
* 3.ds.javaRDD/(scala ds.rdd) 将DataFrame转换成RDD
* 4.ds.printSchema()显示Dataset中的Schema信息
* 5.dataset自带的API 操作Dataset ,用的少
* 6.想使用sql查询,首先要将Dataset注册成临时表:dd.registerTempTable("jtable"),再使用sql,怎么使用sql?sqlContext.sql("sql语句")
* 7.不能读取嵌套的json文件
* 8.ds加载过来之后将列按照ascii排序了
*
* @author root
*/
public class CreateDSFromJosonFile {public static void main(String[] args) {SparkSession sparkSession = SparkSession
.builder()
.appName("jsonfile")
.master("local")
.getOrCreate();
/**
* Dataset的底层是一个一个的RDD
* 注意:
*当 Dataset里的泛型是Row时,我们又可以称之为dataframe
*
* 以下两种方式都可以读取json格式的文件
*
*/Dataset ds = sparkSession.read().format("json").load("data/json");
//Dataset ds = sqlContext.read().json("data/json");
//ds.show();
ds.show();
/**
* 显示DataSet中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)
* 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。
*/
ds.show(100);
/**
*DataSet转换成RDD
*/
//JavaRDD javaRDD = ds.javaRDD();
/**
* 树形的形式显示schema信息
*/
ds.printSchema();
/**
* dataset自带的API 操作dataset
*/
//select name from table
ds.select("name").show();
//select name ,age+10 as addage from table
ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();
//select name ,age from table where age>19
ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();
//select age,count(*) from table group by age
ds.groupBy(ds.col("age")).count().show();
/**
* 将dataset注册成临时的一张表,这张表相当于临时注册到内存中,逻是辑上的表,不会雾化到磁盘
*/ds.createOrReplaceTempView("jtable");
//ds.registerTempTable("jtable");
Dataset sql = sparkSession.sql("select age,count(*) as gg from jtable group by age");
sql.show();
//
//Dataset sql2 = sparkSession.sql("select name,age from jtable");
//sql2.show();
sparkSession.stop();
}
}
2、读取json格式的RDD创建Dataset
package com.shsxt.java.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import java.util.Arrays;
/**
* 读取json格式的RDD创建DF
* @author root
*
*/
public class CreateDSFromJsonRDD {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("jsonrdd")
.master("local")
.getOrCreate();
/**
* 注意:
*1.由于是java版,故通过javaSparkcontext.parallelize来创建json格式的JavaRDD
*所以我们通过sparkcontext来创建javaSparkcontext
*2.如果是scala版本,直接通过sparkcontext.parallelize来创建,就无需创建javaSparkcontext
*/SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD nameRDD = jsc.parallelize(Arrays.asList(
"{'name':'zhangsan','age':\"18\"}",
"{\"name\":\"lisi\",\"age\":\"19\"}",
"{\"name\":\"wangwu\",\"age\":\"20\"}"
));
JavaRDD scoreRDD = jsc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));
Dataset nameds = sparkSession.read().json(nameRDD);
Dataset scoreds = sparkSession.read().json(scoreRDD);
//注册成临时表使用
nameds.createOrReplaceTempView("nameTable");
scoreds.createOrReplaceTempView("scoreTable");
Dataset result =
sparkSession.sql("select nameTable.name,nameTable.age,scoreTable.score "
+ "from nameTable join scoreTable "
+ "on nameTable.name = scoreTable.name");
result.show();
sc.stop();
}
}
3、通过反射的方式将非json格式的RDD转换成Dataset 注意:
1.这种方式不推荐使用
2.自定义类要实现序列化接口
3.自定义类访问级别必须是Public
4.RDD转成Dataset会把自定义类中字段的名称按assci码排序
package com.shsxt.java.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 通过反射的方式将非json格式的RDD转换成Dataset
* 注意:这种方式不推荐使用
*
* @author root
*/
public class CreateDSFromRDDWithReflect {
public static void main(String[] args) {
/**
* 注意:
* 1.自定义类要实现序列化接口
* 2.自定义类访问级别必须是Public
* 3.RDD转成Dataset会把自定义类中字段的名称按assci码排序
*/SparkSession sparkSession = SparkSession
.builder()
.appName("reflect")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD lineRDD = jsc.textFile("data/person.txt");
JavaRDD personRDD = lineRDD.map(
new Function() {private static final long serialVersionUID = 1L;
@Override
public Person call(String line) throws Exception {
Person p = new Person();
p.setId(line.split(",")[0]);
p.setName(line.split(",")[1]);
p.setAge(Integer.valueOf(line.split(",")[2]));
return p;
}
});
/**
* 传入进去Person.class的时候,sqlContext是通过反射的方式创建Dataset
* 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了Dataset
*/
Dataset dataFrame = sparkSession.createDataFrame(personRDD, Person.class);
dataFrame.show();
dataFrame.printSchema();
dataFrame.registerTempTable("person");
Dataset sql = sparkSession.sql("selectname,id,age from person where id = 2");
sql.show();
/**
* 将Dataset转成JavaRDD
* 注意:
* 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用
* 2.可以使用row.getAs("列名")来获取对应的列值。
*
//*/
JavaRDD javaRDD = dataFrame.javaRDD();
JavaRDD map = javaRDD.map(new Function() {/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Person call(Row row) throws Exception {
Person p = new Person();
//p.setId(row.getString(1));
//p.setName(row.getString(2));
//p.setAge(row.getInt(0));
p.setId(row.getAs("id"));
p.setName(row.getAs("name"));
p.setAge(row.getAs("age"));
return p;
}
});
map.foreach(x-> System.out.println(x));
sparkSession.stop();
}
}
4、动态创建Schema将非json格式RDD转换成Dataset
/**
*
*/
package com.shsxt.java.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
/**
* 动态创建Schema将非json格式RDD转换成Dataset
* @author root
*
*/
public class CreateDSFromRDDWithStruct {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("struct")
.master("local")
.getOrCreate();
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD lineRDD = jsc.textFile("data/person.txt");
/**
* 转换成Row类型的RDD
*/
final JavaRDD rowRDD = lineRDD.map(new Function() {/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {return RowFactory.create(
s.split(",")[0],
s.split(",")[1],
Integer.valueOf(s.split(",")[2])
);
}
});
/**
* 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库
*/List asList = Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
Dataset df = sparkSession.createDataFrame(rowRDD, schema);
df.printSchema();
df.show();
sc.stop();
}
}
5、读取JDBC中的数据创建Dataset(MySql为例)
第一种方式读取MySql数据库表,加载为Dataset
第二种方式读取MySql数据表加载为Dataset
package com.shsxt.java.sql.dataset;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CreateDSFromMysql {
public static void main(String[] args) {SparkSession sparkSession = SparkSession
.builder()
.appName("mysql")
.master("local")
.getOrCreate();
/**
* 第一种方式读取MySql数据库表,加载为Dataset
*/
Map options = new HashMap();
options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "root");
options.put("dbtable", "person");
Dataset person = sparkSession.read().format("jdbc").options(options).load();
person.show();
person.createOrReplaceTempView("person");
//person.registerTempTable("person");
/**
* 第二种方式读取MySql数据表加载为Dataset
*/DataFrameReader reader = sparkSession.read().format("jdbc");
reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "root");
reader.option("dbtable", "score");
Dataset score = reader.load();
score.show();
score.createOrReplaceTempView("score");
//score.registerTempTable("score");
Datasetresult =
sparkSession.sql("select person.id,person.name,person.age,score.score "
+ "from person,score "
+ "where person.name = score.nameand score.score> 82");
result.show();
//
result.registerTempTable("result");
Datasetdf = sparkSession.sql("select id,name,age,score from result where age > 18");
df.show();
/**
* 将Dataset结果保存到Mysql中
*/Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "root");
/**
* SaveMode:
* Overwrite:覆盖
* Append:追加
* ErrorIfExists:如果存在就报错
* Ignore:如果存在就忽略
*
*/result.write().mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties);
System.out.println("----Finish----");
sparkSession.stop();
}
}
6、读取parquet文件创建Dataset
注意:
可以将Dataset存储成parquet文件。保存成parquet文件的方式有两种
df.write().mode(SaveMode.Overwrite)format("parquet")
.save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
SaveMode指定文件保存时的模式。
Overwrite:覆盖
Append:追加
ErrorIfExists:如果存在就报错
Ignore:如果存在就忽略
package com.shsxt.java.sql.dataset;
import org.apache.spark.sql.*;
public class CreateDFFromParquet {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("parquet")
.master("local")
.getOrCreate();
Datasetdf = sparkSession.read().json("data/json");
//sparkSession.read().format("json").load("./spark/json");
df.show();
/**
* 将Dataset保存成parquet文件,
* SaveMode指定存储文件时的保存模式:
*Overwrite:覆盖
*Append:追加
*ErrorIfExists:如果存在就报错
*Ignore:如果存在就忽略
* 保存成parquet文件有以下两种方式:
*/df.write().mode(SaveMode.Overwrite).format("parquet").save("data/parquet");
//df.write().mode(SaveMode.Overwrite).parquet("data/parquet");
/**
* 加载parquet文件成Dataset
* 加载parquet文件有以下两种方式:
*/Dataset load = sparkSession.read().format("parquet").load("data/parquet");
load = sparkSession.read().parquet("data/parquet");
load.show();
sparkSession.stop();
}
}
7、通过Bean class来创建 Dataset
/**
*
*/
package com.shsxt.java.sql.dataset;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
/**
* 通过Bean class来创建 Dataset
*
* @author root
*/
public class CreateDSFromBeanClass {public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("beanclass")
.master("local")
.getOrCreate();
Person person = new Person();
person.setId("1");
person.setAge(18);
person.setName("zs");
Person person2 = new Person();
person2.setId("2");
person2.setAge(20);
person2.setName("ls");
List people = Arrays.asList(person, person2);
Encoder personEncoder = Encoders.bean(Person.class);
Dataset dataset = sparkSession.createDataset(people, personEncoder);
dataset.printSchema();
dataset.show();
dataset.registerTempTable("person");
Dataset result = sparkSession.sql("select name , idfrom person");
result.show();
sparkSession.stop();
}
}
8、读取Hive中的数据加载成Dataset
package com.shsxt.java.sql.dataset;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class CreateDSFromHive { public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("hvie")
//开启hive的支持,接下来就可以操作hive表了
// 前提需要是需要开启hive metastore 服务
.enableHiveSupport()
.getOrCreate();
sparkSession.sql("USE spark");
sparkSession.sql("DROP TABLE IF EXISTS student_infos");
//在hive中创建student_infos表
sparkSession.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");
sparkSession.sql("load data local inpath '/root/student_infos' into table student_infos");
//注意:此种方式,程序需要能读取到数据(如/root/student_infos),同时也要能读取到 metastore服务的配置信息。sparkSession.sql("DROP TABLE IF EXISTS student_scores");
sparkSession.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");
sparkSession.sql("LOAD DATA "
+ "LOCAL INPATH '/root/student_scores'"
+ "INTO TABLE student_scores");
//Dataset df = hiveContext.table("student_infos");
//读取Hive表加载Dataset方式
/**
* 查询表生成Dataset
*/
Dataset goodStudentsDF = sparkSession.sql("SELECT si.name, si.age, ss.score "
+ "FROM student_infos si "
+ "JOIN student_scores ss "
+ "ON si.name=ss.name "
+ "WHERE ss.score>=80");
goodStudentsDF.registerTempTable("goodstudent");
Datasetresult = sparkSession.sql("select * from goodstudent");
result.show();
/**
* 将结果保存到hive表 good_student_infos
*/
sparkSession.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
sparkSession.stop();
}
}
四、Spark On Hive的配置 1、在Spark客户端配置spark On hive
在Spark客户端安装包下spark-2.2.1/conf中创建文件hive-site.xml:
配置hive的metastore路径
hive.metastore.uris
thrift://node1:9083
2、启动Hive的metastore服务
hive --service metastore &
3、启动zookeeper集群,启动HDFS集群。 4、启动SparkShell 读取Hive中的表总数,对比hive中查询同一表查询总数测试时间。
./spark-shell
--master spark://node1:7077
spark.sql("select * from day_table").show;
注意:
如果使用Spark on Hive 查询数据时,出现错误:
文章图片
找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径:
文章图片
五、序列化问题 java中以下几种情况下不被序列化 1.反序列化时serializable 版本号不一致时会导致不能反序列化。
步骤:
1.将本地bean serialVersionUID 版本号改为1L,然后序列化
2.将本地bean serialVersionUID 版本号改为2L,然后反序列化
3.观察能否完成反序列化.
结论:简单来说,Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,
JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的, 可以进行反序列化,否则就会出现序列化版本不一致的异常。当实现java.io.Serializable接口的实体(类)没有显式地定义一个名为serialVersionUID,类型为long的变量时,Java序列化机制会根据编译的class自动生成一个serialVersionUID作序列化版本比较用,这种情况下,只有同一次编译生成的class才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本,即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID,类型为long的变量,不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。
package com.shsxt.java.sql.serializeTest;
import com.shsxt.java.sql.serializeTest.bean.User;
import java.io.FileInputStream;
import java.io.ObjectInputStream;
/**
* 测试:反序列化时 实体类中的serialVersionUID改变后能否反序列化?(不能)
* * 步骤:1.将本地bean serialVersionUID 版本号改为1L,然后序列化
* 2.将本地bean serialVersionUID 版本号改为2L,然后反序列化
* 3.观察能否完成反序列化.
* 结论:简单来说,Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,
* JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的,
* 可以进行反序列化,否则就会出现序列化版本不一致的异常。
* 当实现java.io.Serializable接口的实体(类)没有显式地定义一个名为serialVersionUID,类型为long的变量时,
* Java序列化机制会根据编译的class自动生成一个serialVersionUID作序列化版本比较用,这种情况下,
* 只有同一次编译生成的class才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本,
* 即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID,类型为long的变量,
* 不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。
*
* @author root
*/public class DeSerializableTest {
public static void main(String[] args) {
//User user = new User();
//user.setUsername("zhangsan");
//user.setPasswd("1234");
try {/**
* 这里注意第一次运行的时候要把下面代码中的反序列部分注释,然后改正User对象中的serializableVersion的版本号,
* 运行反序列化时,要把序列化代码注释,这样才能保证版本不一致。
*///ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("data/user.txt"));
//out.writeObject(user);
//out.flush();
//out.close();
ObjectInputStream in = new ObjectInputStream(new FileInputStream("data/user.txt"));
User user = (User) in.readObject();
System.out.println(user);
System.out.println(user.getPasswd() + "" + user.getUsername());
in.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2.子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。
**注意**:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化
子类实现serializable 父类不实现serializable,当序列化子类时,反序列化得到父类中的某变量的数值时,该变量的数值与序列化时的数值不相同?(不相同)
步骤:
1.父类不实现serializable,子类实现serializable,父类中比子类中多一个common变量。
2.将子类序列化,然后反序列化,观察能否得到,设置父类中的变量common
3.将父类实现serializable,同样操作第二步骤,观察反序列化能否得到父类中的common值
结论:
一个子类实现了 Serializable 接口,它的父类都没有实现 Serializable 接口,序列化该子类对象,然后反序列化后输出父类定义的某变量的数值, 该变量数值与序列化时的数值不同。(需要在父类中是实现默认的构造方法,否则会报异常:no validconstructor)在父类没有实现 Serializable 接口时,虚拟机是不会序列化父对象的,而一个 Java 对象的构造必须先有父对象,才有子对象,反序列化也不例外。所以反序列化时,为了构造父对象,只能调用父类的无参构造函数作为默认的父对象。因此当我们取父对象的变量值时,它的值是调用父类无参构造函数后的值。 如果你考虑到这种序列化的情况,在父类无参构造函数中对变量进行初始化,否则的话,父类变量值都是默认声明的值,如 int 型的默认是 0,string 型的默认是 null。
应用场景:
根据父类对象序列化的规则,我们可以将不需要被序列化的字段抽取出来放到父类中,子类实现 Serializable 接口, 父类不实现,根据父类序列化规则,父类的字段数据将不被序列化,
package com.shsxt.java.sql.serializeTest;
import com.shsxt.java.sql.serializeTest.bean.Apple;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
/**
* 测试:子类实现serializable 父类不实现serializable,当序列化子类时,反序列化得到父类中的某变量的数值时,该变量的数值与序列化时的数值不相同?(不相同)
* * 步骤:1.父类不实现serializable,子类实现serializable,父类中比子类中多一个common变量。
* 2.将子类序列化,然后反序列化,观察能否得到,设置父类中的变量common
* 3.将父类实现serializable,同样操作第二步骤,观察反序列化能否得到父类中的common值
* * 结论:一个子类实现了 Serializable 接口,它的父类都没有实现 Serializable 接口,序列化该子类对象,然后反序列化后输出父类定义的某变量的数值,
* 该变量数值与序列化时的数值不同。(需要在父类中是实现默认的构造方法,否则会报异常:no validconstructor)
* 在父类没有实现 Serializable 接口时,虚拟机是不会序列化父对象的,而一个 Java 对象的构造必须先有父对象,才有子对象,反序列化也不例外。
* 所以反序列化时,为了构造父对象,只能调用父类的无参构造函数作为默认的父对象。因此当我们取父对象的变量值时,它的值是调用父类无参构造函数后的值。
* 如果你考虑到这种序列化的情况,在父类无参构造函数中对变量进行初始化,否则的话,父类变量值都是默认声明的值,如 int 型的默认是 0,string 型的默认是 null。
* * 应用场景:
* 根据父类对象序列化的规则,我们可以将不需要被序列化的字段抽取出来放到父类中,子类实现 Serializable 接口,
* 父类不实现,根据父类序列化规则,父类的字段数据将不被序列化,
*//**
* 测试子类实现serializable接口,父类不实现serializable接口,父类中的变量不能 被序列化。
* * 步骤:
* 1.子类(Apple)实现serializable接口,父类(Fruit)不实现serializable接口。
* 2.运行以下代码,观察结果。
*
* @author root
*/public class ExtendsSerializableTest {
public static void main(String[] args) {
//Apple apple = new Apple();
//apple.setCommon("in apple");
//apple.setColor("red");
//apple.setName("apple");
try {
//ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("data/apple.txt"));
//out.writeObject(apple);
//out.flush();
//out.close();
//System.out.println("before serializable : ");
//System.out.println("apple common : " + apple.common);
//System.out.println("apple name: " + apple.name);
//System.out.println("apple color : "+ apple.color);
//System.out.println("\n-----------------------\n");
//
ObjectInputStream in = new ObjectInputStream(new FileInputStream("data/apple.txt"));
Apple apple = (Apple) in.readObject();
System.out.println("after serializable :");
System.out.println("apple common : " + apple.common);
System.out.println("apple name : " + apple.name);
System.out.println("apple color : " + apple.color);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}}
3.被关键字transient修饰的变量不能被序列化。
package com.shsxt.java.sql.serializeTest.bean;
import java.io.Serializable;
public class User implements Serializable {
/**
*
*/
private static final long serialVersionUID =2L;
/**
*
*/
private transient String username;
private String passwd;
//private String passwd;
public String getUsername() {
return username;
}public void setUsername(String username) {
this.username = username;
}public String getPasswd() {
return passwd;
}public void setPasswd(String passwd) {
this.passwd = passwd;
} @Override
public String toString() {
return "User [username=" + username + ", passwd=" + passwd + "]";
}
}
4.静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。
结论:
可以看到当改变静态变量的值时,序列化后的静态变量也是改变的,如果正常能被序列化的值,是在反序列化中得到的值是不会变的,可见,静态变量不能被序列化。
那为什么反序列化可以读到值呢?
一个静态变量不管是否被transient修饰,均不能被序列化,反序列化后类中static型变量isPerson的值为当前JVM中对应static变量的值,这个值是JVM中的,不是反序列化得出的
package com.shsxt.java.sql.serializeTest;
import com.shsxt.java.sql.serializeTest.bean.Person;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
/**
* 问题:静态变量能否被序列化?(不能被序列化)
* 【SparkSQL|SparkSql】 * 结论:可以看到当改变静态变量的值时,序列化后的静态变量也是改变的,如果正常能被序列化的值,是在反序列化中得到的值是不会变的,可见,静态变量不能被序列化。
* 那为什么反序列化可以读到值呢?
* 一个静态变量不管是否被transient修饰,均不能被序列化,反序列化后类中static型变量isPerson的值为当前JVM中对应static变量的值,
* 这个值是JVM中的,不是反序列化得出的
*
* @author root
*/
public class StaticSerializableTest {public static void main(String[] args) {
//Person person = new Person();
//Person.isPerson = "yes";
//
//person.setName("zhangsan");
//person.setAge("18");
//System.out.println("before serializable : ");
//System.out.println("isPerson : " + Person.isPerson);
//System.out.println("name : " + person.name);
//System.out.println("age : " + person.age);
try {
//ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("data/personSerialize.txt"));
//out.writeObject(person);
//out.flush();
//out.close();
//System.out.println("\n=================\n");
//
ObjectInputStream in = new ObjectInputStream(new FileInputStream("data/personSerialize.txt"));
Person person = (Person) in.readObject();
in.close();
System.out.println("after serializable : ");
System.out.println("isPerson : " + person.isPerson);
System.out.println("name : " + person.name);
System.out.println("age : " + person.age);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}}
}
六、储存DataSet
- 将DataSet存储为parquet文件。
- 将DataSet存储到JDBC数据库。
- 将DataSet存储到Hive表。
package com.shsxt.java.sql.udf_udaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* UDAF 用户自定义聚合函数
*
* @author root
*/
public class UDAF {
public static void main(String[] args) {SparkSession sparkSession = SparkSession
.builder()
.appName("udaf")
.master("local")
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD parallelize = sc.parallelize(
Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi","zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi"),2);
JavaRDD rowRDD = parallelize.map(new Function() {/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {return RowFactory.create(s);
}
});
List fields = new ArrayList();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset df = sparkSession.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
* 注册一个UDAF函数,实现统计相同值得个数
* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的
* 数据:
*zhangsan
*zhangsan
*lisi
*lisi
*
*select count(*)from user group by name
*/sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {/**
*
*/
private static final long serialVersionUID = 1L;
/**
* 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);
System.out.println("init ....." + buffer.get(0));
}/**
* 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
* buffer.getInt(0)获取的是上一次聚合后的值
* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合
* 大聚和发生在reduce端.
* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
*/
@Override
public void update(MutableAggregationBuffer buffer, Row arg1) {buffer.update(0, buffer.getInt(0) + 1);
System.out.println("update.....buffer" + buffer.toString() + " | row" + arg1.toString() );
}/**
* 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
* buffer1.getInt(0) : 大聚合的时候 上一次聚合后的值
* buffer2.getInt(0) : 这次计算传入进来的update的结果
* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
*/public void merge(MutableAggregationBuffer buffer1, Row arg1) {
// 2 34567
// 0 + 2 = 2
// 2 + 3 = 5
// 5 + 4= 9buffer1.update(0, buffer1.getInt(0) + arg1.getInt(0));
System.out.println("merge.....buffer : " + buffer1.toString() + "| row" + arg1.toString() );
}/**
* 在进行聚合操作的时候所要处理的数据的结果的类型
*/
@Override
public StructType bufferSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
}/**
* 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果
*/
@Override
public Object evaluate(Row row) {
return row.getInt(0);
}/**
* 指定UDAF函数计算后返回的结果类型
*/
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}/**
* 指定输入字段的字段及类型
*/
@Override
public StructType inputSchema() {
return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));
}/**
* 确保一致性 一般用true,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。
*/
@Override
public boolean deterministic() {
return true;
}});
sparkSession.sql("select name ,StringCount(name) as number from user group by name").show();
sc.stop();
}
}
UDF 用户自定义函数
package com.shsxt.java.sql.udf_udaf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
* UDF 用户自定义函数
* @author root
*
*/
public class UDF {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("udf")
.master("local")
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu"));
JavaRDD rowRDD = parallelize.map(new Function() {/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
/**
* 动态创建Schema方式加载DF
*/
List fields = new ArrayList();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
Dataset df = sparkSession.createDataFrame(rowRDD,schema);
df.registerTempTable("user");
/**
* 根据UDF函数参数的个数来决定是实现哪一个UDFUDF1,UDF2。。。。UDF1xxx
*/sparkSession.udf().register("StrLen",new UDF2() {/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1, Integer t2) throws Exception {
return t1.length() + t2;
}
} ,DataTypes.IntegerType );
sparkSession.sql("select name ,StrLen(name,100) as length from user").show();
//
sparkSession.stop();
}
}
八、开窗函数
row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于 分组取topN
开窗函数格式:
row_number() over (partitin by XXX order by XXX)
package com.shsxt.java.sql.windowfun;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* row_number()开窗函数:
* 主要是按照某个字段分组,然后取另一字段的前几个的值,相当于 分组取topN
* row_number() over (partition by xxx order by xxx desc) xxx
* @author root
*
*/
public class RowNumberWindowFun {
//-Xms800m -Xmx800m-XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("window")
.master("local")
//开启hive的支持,接下来就可以操作hive表了
// 前提需要是需要开启hive metastore 服务
.enableHiveSupport()
.getOrCreate();
sparkSession.sql("use spark");
sparkSession.sql("drop table if exists sales");
sparkSession.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
sparkSession.sql("load data local inpath './data/sales.txt' into table sales");
/**
* 开窗函数格式:
* 【 row_number() over (partition by XXX order by XXX) as rank】
* 注意:rank 从1开始
*/
/**
* 以类别分组,按每种类别金额降序排序,显示 【日期,种类,金额】 结果,如:
*
* 1 A 100
* 2 B 200
* 3 A 300
* 4 B 400
* 5 A 500
* 6 B 600
* 排序后:
* 5 A 500--rank 1
* 3 A 300--rank 2
* 1 A 100--rank 3
* 6 B 600--rank 1
* 4 B 400 --rank 2
* 2 B 200--rank 3
*
* 2018 A 4001
* 2017 A 5002
* 2016 A 5503
*
*
* 2016 A 5501
* 2017 A 5002
* 2018 A 4003
*
*/Dataset result = sparkSession.sql("select riqi,leibie,jine,rank "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show(100);
/**
* 将结果保存到hive表sales_result
*/
//result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
sparkSession.stop();
}
}