spark算子实现词频统计(java版和scala版)
//spark实现本地文件读取并用reduce进行聚合
package day04 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test04 { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Test04") val sc :SparkContext=new SparkContext(conf) val pRdd : RDD[Int]=sc.parallelize(Array(1,2,3,4,5)) val textFile:RDD[String]=sc.textFile("D:\\1.txt") val reduceRdd:Int=pRdd.reduce(_+_) println(reduceRdd.toString) sc.stop() } }
【spark算子实现词频统计(java版和scala版)】//spark实现词频统计(scala版)
package day03 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount_Scala { def main(args: Array[String]) { //local[*] 使用当前环境的默认cpu核数作为并行的线程数 //local使用一个线程来运行 val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount_Scala") val sc: SparkContext = new SparkContext(conf) val file: RDD[String] = sc.textFile("D:\\1.txt") val flatMap: RDD[String] = file.flatMap(_.split("\\s")) val map: RDD[(String, Int)] = flatMap.map((_,1)) val key = map.reduceByKey(_+_) val map1: RDD[(Int, String)] = key.map(x=>(x._2,x._1)) val map2: RDD[(Int, String)] = map1.sortByKey(false) //不写或写true-->正序false-->倒序 val map3: RDD[(String, Int)] = map2.map(x=>(x._2,x._1)) map3.foreach(x=>{ println("当前的单词是: " + x._1) println("单词出现的次数是 : " + x._2) }) }
}
//spark实现词频统计(java版)
package day03; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class WordCount_Java { public static void main(String[]args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount_Java"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD file = jsc.textFile("D:\\1.txt"); JavaRDD words = file.flatMap(new FlatMapFunction() { public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairRDD wordMap = words.mapToPair(new PairFunction() { public Tuple2 call(String word) throws Exception { return new Tuple2(word, 1); } }); JavaPairRDD reduceWord = wordMap.reduceByKey(new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); JavaPairRDD pairRDD = reduceWord.mapToPair(new PairFunction () { public Tuple2 call (Tuple2 t2) throws Exception { return new Tuple2 (t2._2, t2._1); } }); JavaPairRDD sortPairRDD = pairRDD.sortByKey(false); JavaPairRDD resultRdd = sortPairRDD.mapToPair(new PairFunction , String, Integer>() { public Tuple2 call (Tuple2 t2) throws Exception { return new Tuple2(t2._2, t2._1); } }); resultRdd.foreach(new VoidFunction () { public void call (Tuple2 t2) throws Exception { System.out.println("出现的单词是: " + t2._1); System.out.println("单词出现的次数是 : " + t2._2); } }); } }
推荐阅读
- Spark Streaming之流式词频统计(Socket数据源)
- spark|莎士比亚统计最高的词频数 java,spark,rdd
- 技术-大数据|转(Spark案例:Scala版统计单词个数)
- Spark Streaming词频统计实例
- 大数据|Spark 之 RDD转换算子
- Spark基础学习笔记|Spark RDD案例(词频统计)
- spark|spark案例-词频统计(存储数据库)
- 面试手写实现Promise.all
- 字节飞书面试promise.all实现示例
- Fiddler|没有接口文档,使用fiddler抓包实现接口测试