Spark SQL 版本
package com.jiangzeyun.sparkSQl;
import java.util.Arrays;
import java.util.Random;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 基于 Spark Sql 的 wordcount
* 思路:
* 1.文件加载成 RDD
* 2.RDD转 DataFrame
* 3.SQL 查询就 ok 了哈哈
* @author jiangzeyun
*
*/
public class WordCount {
public static void main(String[] args) {SparkSession spark = SparkSession
.builder()
.appName("Test DataFrame")
//集群模式换成 hive mestastore 地址
// .config("hive.metastore.uris","")
//集群模式换成 spark://xx:50070
.config("spark.master", "local")
.enableHiveSupport()
.getOrCreate();
String path = "/usr/local/spark/examples/src/main/resources/README.md";
//create RDD
JavaRDD peopleRDD = spark.read()
.textFile(path)
.javaRDD()
//用 flatMap 进行转换
.flatMap(line->Arrays.asList(line.split(" ")).iterator())
.map(line -> {
Person person = new Person();
person.setName(line);
person.setAge(new Random().nextInt(10));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
//wordCount,展示数量最多的top100单词
Dataset wordCount = spark.sql("SELECT name,count(*) as cnt FROM people group by name order by cnt desc limit 100");
wordCount.show(100);
}
}
package com.jiangzeyun.sparkSQl;
public class Person {
private String name;
private int age;
public String getName() {
return name;
}public void setName(String name) {
this.name = name;
}public int getAge() {
return age;
}public void setAge(int age) {
this.age = age;
}
}
运行结果如下:
运行结果如下
+-----------------+---+ |name|cnt| +-----------------+---+ || 68| |the| 22| |Spark| 15| |to| 14| |for| 11| |and| 11| |##|8| |a|8| |can|7| |run|7| |in|6| |is|6| |using|5| |on|5| |of|5| |you|4| |with|4| |You|4| |build|4| |an|4| |if|4| |also|4| |documentation|3| |example|3| |one|3| |use|3| |see|3| |Please|3| |including|3| |For|3| |or|3| |Hadoop|3| |command,|2| |set|2| |programs|2| |tests|2| |particular|2| |guidance|2| |shell:|2| |Interactive|2| |how|2| |Hive|2| |building|2| |be|2| |locally|2| |should|2| |[project|2| |`examples`|2| |general|2| |1000:|2| |detailed|2| |following|2| |SparkPi|2| |refer|2| |./bin/run-example|2| |It|2| |To|2| |return|2| |cluster|2| |do|2| |Scala|2| |class|2| |Hadoop,|2| |SQL|2| |Python,|2| |examples|2| |Python|2| |at|2| |that|2| |This|2| |Shell|2| |supports|2| |which|2| |online|1| |graphs|1| |["Parallel|1| |["Building|1| |thread|1| |abbreviated|1| |overview|1| |rich|1| |-DskipTests|1| |name|1| |["Specifying|1| |stream|1| |run:|1| |not|1| |./dev/run-tests|1| |will|1| |[run|1| |option|1| |Alternatively,|1| |by|1| |must|1| |MLlib|1| |DataFrames,|1| |variable|1| |Note|1| |core|1| |more|1| +-----------------+---+ |
Spark 版本
package com.jiangzeyun.test;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import shapeless.ops.tuple;
import sun.net.www.content.audio.wav;
/**
* 测试wordCount
* 运用本地模式进行测试WordCount
* JAVA8的lambda表达式
* @author jiangzeyun
*
*/
public class WordCount {
public static void main(String[] args) {if (args.length < 1) {
System.out.println("Usage: JAVAWordCount ");
System.exit(1);
}
//本地模式
//SparkConf conf = new SparkConf().setMaster("local").setAppName("word count");
//集群模式
SparkConf conf = new SparkConf().setMaster("master").setAppName("word count");
JavaSparkContext sc = new JavaSparkContext(conf);
//本机地址
//JavaRDD lines = sc.textFile("/usr/local/spark/README.md");
JavaRDD lines = sc.textFile(args[0]);
JavaPairRDD counts = lines.flatMap(line->Arrays.asList(line.split(" ")).iterator())
.mapToPair(w-> new Tuple2(w, 1)).reduceByKey((x,y)->(x+y));
counts.mapToPair(s->new Tuple2(s._2, s._1)).sortByKey(false)
.mapToPair(s -> new Tuple2(s._2,s._1))
.collect()
.forEach(tuple -> System.out.println(tuple._1() +":" +tuple._2()));
}
}
【spark|Spark,SparkSql wordCount,java wordcount】
推荐阅读
- 大数据|hadoop安装
- 大数据|hbase安装
- spark|spark UDAF根据某列去重求合 distinct sum
- 大数据|spark UDAF 自定义聚合函数 UserDefinedAggregateFunction 带条件的去重操作
- Gank Spark
- Spark 写入 MySQL 乱码问题
- SCD|一种基于SparkSQL的Hive数据仓库拉链表缓慢变化维(SCD2+SCD1)的示例实现
- spark算子--action篇