JSON综合性复杂案例

查询成绩为80分以上的学生的基本信息与成绩信息
Student.json
{"name":"Leo", "score":85}
{"name":"Marry", "score":99}
{"name":"Jack", "score":74}

/** * JSON数据源 * @author Administrator * */ public class JSONDataSource {?public static void main(String[] args) { ??SparkConf conf = new SparkConf()???.setAppName("JSONDataSource"); ??JavaSparkContext sc = new JavaSparkContext(conf); ??SQLContext sqlContext = new SQLContext(sc); ??// 针对json文件,创建DataFrame(针对json文件创建DataFrame) ??DataFrame studentScoresDF = sqlContext.read().json????"hdfs://spark1:9000/spark-study/students.json"); // 针对学生成绩信息的DataFrame,注册临时表,查询分数大于80分的学生的姓名 ??// (注册临时表,针对临时表执行sql语句) ??studentScoresDF.registerTempTable("student_scores"); ??DataFrame goodStudentScoresDF = sqlContext.sql(????"select name,score from student_scores where score>=80"); // (将DataFrame转换为rdd,执行transformation操作) ??List goodStudentNames = goodStudentScoresDF.javaRDD().map(new Function() {?????private static final long serialVersionUID = 1L; ?????@Override ?????public String call(Row row) throws Exception { ??????return row.getString(0); ?????} ????}).collect(); ??// 然后针对JavaRDD,创建DataFrame ??// (针对包含json串的JavaRDD,创建DataFrame) ??List studentInfoJSONs = new ArrayList(); ??studentInfoJSONs.add("{\"name\":\"Leo\", \"age\":18}"); ??studentInfoJSONs.add("{\"name\":\"Marry\", \"age\":17}"); ??studentInfoJSONs.add("{\"name\":\"Jack\", \"age\":19}"); ??JavaRDD studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs); ??DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD); ??// 针对学生基本信息DataFrame,注册临时表,然后查询分数大于80分的学生的基本信息 ??studentInfosDF.registerTempTable("student_infos"); ??String sql = "select name,age from student_infos where name in ("; for(int i = 0; i < goodStudentNames.size(); i++) { ???sql += "'" + goodStudentNames.get(i) + "'"; ???if(i < goodStudentNames.size() - 1) { ????sql += ","; ???} ??} ??sql += ")"; ??DataFrame goodStudentInfosDF = sqlContext.sql(sql); ??// 然后将两份数据的DataFrame,转换为JavaPairRDD,执行join transformation ??// (将DataFrame转换为JavaRDD,再map为JavaPairRDD,然后进行join) ??JavaPairRDD> goodStudentsRDD = ????goodStudentScoresDF.javaRDD().mapToPair(new PairFunction() {private static final long serialVersionUID = 1L; ?????@Override ?????public Tuple2 call(Row row) throws Exception { ??????return new Tuple2(row.getString(0), ????????Integer.valueOf(String.valueOf(row.getLong(1)))); ?????} ????}).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction() {private static final long serialVersionUID = 1L; ?????@Override ?????public Tuple2 call(Row row) throws Exception { ??????return new Tuple2(row.getString(0), ????????Integer.valueOf(String.valueOf(row.getLong(1)))); ?????} ????})); // 然后将封装在RDD中的好学生的全部信息,转换为一个JavaRDD的格式 ??// (将JavaRDD,转换为DataFrame) ??JavaRDD goodStudentRowsRDD = goodStudentsRDD.map(????new Function, Row>() {?????private static final long serialVersionUID = 1L; ?????@Override ?????public Row call( ???????Tuple2> tuple) ???????throws Exception { ??????return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2); ?????} ????}); ??// 创建一份元数据,将JavaRDD转换为DataFrame ??List structFields = new ArrayList(); ??structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); ??structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); ??structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); ??StructType structType = DataTypes.createStructType(structFields); ??DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType); // 将好学生的全部信息保存到一个json文件中去 // (将DataFrame中的数据保存到外部的json文件中去) goodStudentsDF.write().format("json").save("hdfs://spark1:9000/spark-study/good-students"); ?} }

【JSON综合性复杂案例】查看结果:
Hadoop fs –text /spark-study/good-students/part-r*
Scala版本
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.Row import org.apache.spark.sql.types.LongType/** * @author Administrator */ object JSONDataSource {def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("JSONDataSource") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc)// 创建学生成绩DataFrame val studentScoresDF = sqlContext.read.json("hdfs://spark1:9000/spark-study/students.json")// 查询出分数大于80分的学生成绩信息,以及学生姓名 studentScoresDF.registerTempTable("student_scores") val goodStudentScoresDF = sqlContext.sql("select name,score from student_scores where score>=80") val goodStudentNames = goodStudentScoresDF.rdd.map { row => row(0) }.collect()IDEa// 创建学生基本信息DataFrame val studentInfoJSONs = Array("{\"name\":\"Leo\", \"age\":18}", "{\"name\":\"Marry\", \"age\":17}", "{\"name\":\"Jack\", \"age\":19}") val studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs, 3); val studentInfosDF = sqlContext.read.json(studentInfoJSONsRDD)// 查询分数大于80分的学生的基本信息 studentInfosDF.registerTempTable("student_infos")var sql = "select name,age from student_infos where name in (" for(i <- 0 until goodStudentNames.length) { sql += "'" + goodStudentNames(i) + "'" if(i < goodStudentNames.length - 1) { sql += "," } } sql += ")"val goodStudentInfosDF = sqlContext.sql(sql)// 将分数大于80分的学生的成绩信息与基本信息进行join val goodStudentsRDD = goodStudentScoresDF.rdd.map { row => (row.getAs[String]("name"), row.getAs[Long]("score")) } .join(goodStudentInfosDF.rdd.map { row => (row.getAs[String]("name"), row.getAs[Long]("age")) })// 将rdd转换为dataframe val goodStudentRowsRDD = goodStudentsRDD.map( info => Row(info._1, info._2._1.toInt, info._2._2.toInt))val structType = StructType(Array( StructField("name", StringType, true), StructField("score", IntegerType, true), StructField("age", IntegerType, true)))val goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType)// 将dataframe中的数据保存到json中 goodStudentsDF.write.format("json").save("hdfs://spark1:9000/spark-study/good-students-scala") }}

    推荐阅读