spark|Spark,SparkSql wordCount,java wordcount

Spark SQL 版本

package com.jiangzeyun.sparkSQl; import java.util.Arrays; import java.util.Random; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * 基于 Spark Sql 的 wordcount * 思路: * 1.文件加载成 RDD * 2.RDD转 DataFrame * 3.SQL 查询就 ok 了哈哈 * @author jiangzeyun * */ public class WordCount { public static void main(String[] args) {SparkSession spark = SparkSession .builder() .appName("Test DataFrame") //集群模式换成 hive mestastore 地址 // .config("hive.metastore.uris","") //集群模式换成 spark://xx:50070 .config("spark.master", "local") .enableHiveSupport() .getOrCreate(); String path = "/usr/local/spark/examples/src/main/resources/README.md"; //create RDD JavaRDD peopleRDD = spark.read() .textFile(path) .javaRDD() //用 flatMap 进行转换 .flatMap(line->Arrays.asList(line.split(" ")).iterator()) .map(line -> { Person person = new Person(); person.setName(line); person.setAge(new Random().nextInt(10)); return person; }); // Apply a schema to an RDD of JavaBeans to get a DataFrame Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people"); //wordCount,展示数量最多的top100单词 Dataset wordCount = spark.sql("SELECT name,count(*) as cnt FROM people group by name order by cnt desc limit 100"); wordCount.show(100); } }

package com.jiangzeyun.sparkSQl; public class Person { private String name; private int age; public String getName() { return name; }public void setName(String name) { this.name = name; }public int getAge() { return age; }public void setAge(int age) { this.age = age; } }

运行结果如下:
运行结果如下
+-----------------+---+
|name|cnt|
+-----------------+---+
|| 68|
|the| 22|
|Spark| 15|
|to| 14|
|for| 11|
|and| 11|
|##|8|
|a|8|
|can|7|
|run|7|
|in|6|
|is|6|
|using|5|
|on|5|
|of|5|
|you|4|
|with|4|
|You|4|
|build|4|
|an|4|
|if|4|
|also|4|
|documentation|3|
|example|3|
|one|3|
|use|3|
|see|3|
|Please|3|
|including|3|
|For|3|
|or|3|
|Hadoop|3|
|command,|2|
|set|2|
|programs|2|
|tests|2|
|particular|2|
|guidance|2|
|shell:|2|
|Interactive|2|
|how|2|
|Hive|2|
|building|2|
|be|2|
|locally|2|
|should|2|
|[project|2|
|`examples`|2|
|general|2|
|1000:|2|
|detailed|2|
|following|2|
|SparkPi|2|
|refer|2|
|./bin/run-example|2|
|It|2|
|To|2|
|return|2|
|cluster|2|
|do|2|
|Scala|2|
|class|2|
|Hadoop,|2|
|SQL|2|
|Python,|2|
|examples|2|
|Python|2|
|at|2|
|that|2|
|This|2|
|Shell|2|
|supports|2|
|which|2|
|online|1|
|graphs|1|
|["Parallel|1|
|["Building|1|
|thread|1|
|abbreviated|1|
|overview|1|
|rich|1|
|-DskipTests|1|
|name|1|
|["Specifying|1|
|stream|1|
|run:|1|
|not|1|
|./dev/run-tests|1|
|will|1|
|[run|1|
|option|1|
|Alternatively,|1|
|by|1|
|must|1|
|MLlib|1|
|DataFrames,|1|
|variable|1|
|Note|1|
|core|1|
|more|1|
+-----------------+---+

Spark 版本
package com.jiangzeyun.test; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import shapeless.ops.tuple; import sun.net.www.content.audio.wav; /** * 测试wordCount * 运用本地模式进行测试WordCount * JAVA8的lambda表达式 * @author jiangzeyun * */ public class WordCount { public static void main(String[] args) {if (args.length < 1) { System.out.println("Usage: JAVAWordCount "); System.exit(1); } //本地模式 //SparkConf conf = new SparkConf().setMaster("local").setAppName("word count"); //集群模式 SparkConf conf = new SparkConf().setMaster("master").setAppName("word count"); JavaSparkContext sc = new JavaSparkContext(conf); //本机地址 //JavaRDD lines = sc.textFile("/usr/local/spark/README.md"); JavaRDD lines = sc.textFile(args[0]); JavaPairRDD counts = lines.flatMap(line->Arrays.asList(line.split(" ")).iterator()) .mapToPair(w-> new Tuple2(w, 1)).reduceByKey((x,y)->(x+y)); counts.mapToPair(s->new Tuple2(s._2, s._1)).sortByKey(false) .mapToPair(s -> new Tuple2(s._2,s._1)) .collect() .forEach(tuple -> System.out.println(tuple._1() +":" +tuple._2())); } }

【spark|Spark,SparkSql wordCount,java wordcount】

    推荐阅读