Spark Streaming词频统计实例

本实例旨在:通过Spark Streaming流式地处理一个数据服务从TCP套接字中接收到的数据。
一创建maven工程,引入相应依赖jar包

2.11.8 repos Repository http://maven.aliyun.com/nexus/content/groups/public scala-tools.org Scala-Tools Maven2 Repository http://scala-tools.org/repo-releases repos Repository http://maven.aliyun.com/nexus/content/groups/public scala-tools.org Scala-Tools Maven2 Repository http://scala-tools.org/repo-releases org.apache.spark spark-core_2.11 2.4.0 provided org.apache.spark spark-sql_2.11 2.4.0 provided org.apache.spark spark-streaming_2.11 2.4.0 provided org.scala-lang scala-library ${scala.version} src/main/scala src/test/scalaorg.scala-tools maven-scala-plugin 2.15.2 scala-compile-first compile **/*.scala scala-test-compile testCompile org.apache.maven.plugins maven-compiler-plugin 3.1 1.8 1.8 maven-assembly-plugin false jar-with-dependencies org.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordsFrep make-assemblypackage single org.apache.maven.plugins maven-jar-plugin 2.4 trueorg.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordsFrep

二:Scala代码如下:
package org.jy.data.yh.bigdata.drools.scala.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}/** * Spark Streaming 数据流统计词频 */ object SparkStreamingWordsFrep { def main(args: Array[String]): Unit = { // Spark 配置项 val sparkConf = new SparkConf() .setAppName("SparkStreamingWordsFrep") .setMaster("spark://centoshadoop1:7077,centoshadoop2:7077") // 创建流式上下文 val sparkStreamContext = new StreamingContext(sparkConf,Seconds(2)) // 创建一个DStream,连接指定的hostname:port,比如localhost:9999 val lines = sparkStreamContext.socketTextStream("centoshadoop1",9999) // 将接收到的每条信息分割成次词语 val words = lines.flatMap(line =>{line.split(" ")}) // 统计每个batch的词频 val pairs = words.map(word =>(word,1)) // 汇总词频 val wordCounts = pairs.reduceByKey(_+_) // 将key相同的元组的value累积 // 打印从DStream中生成的RDD的前10个元素到控制台 wordCounts.print(10000) sparkStreamContext.start() // 开始计算 sparkStreamContext.awaitTermination() //等待计算结束 }}

三,linux系统安装nmap-ncat软件
yum install nc
四,打包到spark集群上运行,命令如下
bin/spark-submit \
--class org.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordsFrep \
--num-executors 4 \
--driver-memory 2G \
--executor-memory 1g \
--executor-cores 1 \
--conf spark.default.parallelism=1000 \
/home/hadoop/tools/SSO-Scala-SparkStreaming-1.0-SNAPSHOT.jar
五,打开两个linux界面,在其中一个界面输入要统计的文本内容
如下图:
Spark Streaming词频统计实例
文章图片

[hadoop@centoshadoop1 ~]$ nc -lk 9999
Spark streaming is an extension of the core Spark API
输出效果如下:
【Spark Streaming词频统计实例】Spark Streaming词频统计实例
文章图片

    推荐阅读