spark算子实现词频统计(java版和scala版)

//spark实现本地文件读取并用reduce进行聚合

package day04 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test04 { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Test04") val sc :SparkContext=new SparkContext(conf) val pRdd : RDD[Int]=sc.parallelize(Array(1,2,3,4,5)) val textFile:RDD[String]=sc.textFile("D:\\1.txt") val reduceRdd:Int=pRdd.reduce(_+_) println(reduceRdd.toString) sc.stop() } }


【spark算子实现词频统计(java版和scala版)】//spark实现词频统计(scala版)
package day03 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount_Scala { def main(args: Array[String]) { //local[*] 使用当前环境的默认cpu核数作为并行的线程数 //local使用一个线程来运行 val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount_Scala") val sc: SparkContext = new SparkContext(conf) val file: RDD[String] = sc.textFile("D:\\1.txt") val flatMap: RDD[String] = file.flatMap(_.split("\\s")) val map: RDD[(String, Int)] = flatMap.map((_,1)) val key = map.reduceByKey(_+_) val map1: RDD[(Int, String)] = key.map(x=>(x._2,x._1)) val map2: RDD[(Int, String)] = map1.sortByKey(false) //不写或写true-->正序false-->倒序 val map3: RDD[(String, Int)] = map2.map(x=>(x._2,x._1)) map3.foreach(x=>{ println("当前的单词是: " + x._1) println("单词出现的次数是 : " + x._2) }) }

}

//spark实现词频统计(java版)

package day03; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class WordCount_Java { public static void main(String[]args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount_Java"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD file = jsc.textFile("D:\\1.txt"); JavaRDD words = file.flatMap(new FlatMapFunction() { public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairRDD wordMap = words.mapToPair(new PairFunction() { public Tuple2 call(String word) throws Exception { return new Tuple2(word, 1); } }); JavaPairRDD reduceWord = wordMap.reduceByKey(new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); JavaPairRDD pairRDD = reduceWord.mapToPair(new PairFunction() { public Tuple2 call (Tuple2 t2) throws Exception { return new Tuple2(t2._2, t2._1); } }); JavaPairRDD sortPairRDD = pairRDD.sortByKey(false); JavaPairRDD resultRdd = sortPairRDD.mapToPair(new PairFunction, String, Integer>() { public Tuple2 call (Tuple2 t2) throws Exception { return new Tuple2(t2._2, t2._1); } }); resultRdd.foreach(new VoidFunction() { public void call (Tuple2 t2) throws Exception { System.out.println("出现的单词是: " + t2._1); System.out.println("单词出现的次数是 : " + t2._2); } }); } }


    推荐阅读