JSON综合性复杂案例
查询成绩为80分以上的学生的基本信息与成绩信息
Student.json
{"name":"Leo", "score":85}
{"name":"Marry", "score":99}
{"name":"Jack", "score":74}
/**
* JSON数据源
* @author Administrator
*
*/
public class JSONDataSource {?public static void main(String[] args) {
??SparkConf conf = new SparkConf()???.setAppName("JSONDataSource");
??JavaSparkContext sc = new JavaSparkContext(conf);
??SQLContext sqlContext = new SQLContext(sc);
??// 针对json文件,创建DataFrame(针对json文件创建DataFrame)
??DataFrame studentScoresDF = sqlContext.read().json????"hdfs://spark1:9000/spark-study/students.json");
// 针对学生成绩信息的DataFrame,注册临时表,查询分数大于80分的学生的姓名
??// (注册临时表,针对临时表执行sql语句)
??studentScoresDF.registerTempTable("student_scores");
??DataFrame goodStudentScoresDF = sqlContext.sql(????"select name,score from student_scores where score>=80");
// (将DataFrame转换为rdd,执行transformation操作)
??List goodStudentNames = goodStudentScoresDF.javaRDD().map(new Function() {?????private static final long serialVersionUID = 1L;
?????@Override
?????public String call(Row row) throws Exception {
??????return row.getString(0);
?????}
????}).collect();
??// 然后针对JavaRDD,创建DataFrame
??// (针对包含json串的JavaRDD,创建DataFrame)
??List studentInfoJSONs = new ArrayList();
??studentInfoJSONs.add("{\"name\":\"Leo\", \"age\":18}");
??studentInfoJSONs.add("{\"name\":\"Marry\", \"age\":17}");
??studentInfoJSONs.add("{\"name\":\"Jack\", \"age\":19}");
??JavaRDD studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
??DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
??// 针对学生基本信息DataFrame,注册临时表,然后查询分数大于80分的学生的基本信息
??studentInfosDF.registerTempTable("student_infos");
??String sql = "select name,age from student_infos where name in (";
for(int i = 0;
i < goodStudentNames.size();
i++) {
???sql += "'" + goodStudentNames.get(i) + "'";
???if(i < goodStudentNames.size() - 1) {
????sql += ",";
???}
??}
??sql += ")";
??DataFrame goodStudentInfosDF = sqlContext.sql(sql);
??// 然后将两份数据的DataFrame,转换为JavaPairRDD,执行join transformation
??// (将DataFrame转换为JavaRDD,再map为JavaPairRDD,然后进行join)
??JavaPairRDD> goodStudentsRDD = ????goodStudentScoresDF.javaRDD().mapToPair(new PairFunction() {private static final long serialVersionUID = 1L;
?????@Override
?????public Tuple2 call(Row row) throws Exception {
??????return new Tuple2(row.getString(0),
????????Integer.valueOf(String.valueOf(row.getLong(1))));
?????}
????}).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction() {private static final long serialVersionUID = 1L;
?????@Override
?????public Tuple2 call(Row row) throws Exception {
??????return new Tuple2(row.getString(0),
????????Integer.valueOf(String.valueOf(row.getLong(1))));
?????}
????}));
// 然后将封装在RDD中的好学生的全部信息,转换为一个JavaRDD的格式
??// (将JavaRDD,转换为DataFrame)
??JavaRDD goodStudentRowsRDD = goodStudentsRDD.map(????new Function, Row>() {?????private static final long serialVersionUID = 1L;
?????@Override
?????public Row call(
???????Tuple2> tuple) ???????throws Exception {
??????return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
?????}
????});
??// 创建一份元数据,将JavaRDD转换为DataFrame
??List structFields = new ArrayList();
??structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
??structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
??structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
??StructType structType = DataTypes.createStructType(structFields);
??DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType);
// 将好学生的全部信息保存到一个json文件中去
// (将DataFrame中的数据保存到外部的json文件中去)
goodStudentsDF.write().format("json").save("hdfs://spark1:9000/spark-study/good-students");
?}
}
【JSON综合性复杂案例】查看结果:
Hadoop fs –text /spark-study/good-students/part-r*
Scala版本
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.LongType/**
* @author Administrator
*/
object JSONDataSource {def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("JSONDataSource")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)// 创建学生成绩DataFrame
val studentScoresDF = sqlContext.read.json("hdfs://spark1:9000/spark-study/students.json")// 查询出分数大于80分的学生成绩信息,以及学生姓名
studentScoresDF.registerTempTable("student_scores")
val goodStudentScoresDF = sqlContext.sql("select name,score from student_scores where score>=80")
val goodStudentNames = goodStudentScoresDF.rdd.map { row => row(0) }.collect()IDEa// 创建学生基本信息DataFrame
val studentInfoJSONs = Array("{\"name\":\"Leo\", \"age\":18}",
"{\"name\":\"Marry\", \"age\":17}",
"{\"name\":\"Jack\", \"age\":19}")
val studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs, 3);
val studentInfosDF = sqlContext.read.json(studentInfoJSONsRDD)// 查询分数大于80分的学生的基本信息
studentInfosDF.registerTempTable("student_infos")var sql = "select name,age from student_infos where name in ("
for(i <- 0 until goodStudentNames.length) {
sql += "'" + goodStudentNames(i) + "'"
if(i < goodStudentNames.length - 1) {
sql += ","
}
}
sql += ")"val goodStudentInfosDF = sqlContext.sql(sql)// 将分数大于80分的学生的成绩信息与基本信息进行join
val goodStudentsRDD =
goodStudentScoresDF.rdd.map { row => (row.getAs[String]("name"), row.getAs[Long]("score")) }
.join(goodStudentInfosDF.rdd.map { row => (row.getAs[String]("name"), row.getAs[Long]("age")) })// 将rdd转换为dataframe
val goodStudentRowsRDD = goodStudentsRDD.map(
info => Row(info._1, info._2._1.toInt, info._2._2.toInt))val structType = StructType(Array(
StructField("name", StringType, true),
StructField("score", IntegerType, true),
StructField("age", IntegerType, true)))val goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType)// 将dataframe中的数据保存到json中
goodStudentsDF.write.format("json").save("hdfs://spark1:9000/spark-study/good-students-scala")
}}
推荐阅读
- MybatisPlus使用queryWrapper如何实现复杂查询
- 世外小学惨案后,和孩子聊聊世界的“复杂”
- 【过马路的时候】(稍微写复杂点他们就看不懂)
- 纯非纯之复杂!
- WebAPI|WebAPI 多对多对象输出json异常()
- 奥特曼变得越来越复杂
- Python|Python 小项目2 使用JSON API并处理数据
- 错综复杂的人际关系,一张图谱一目了然
- MySQL|MySQL 5.7 JSON特性支持
- 解析json用的Gson解析出现异常记录问题