本实例旨在:通过Spark Streaming流式地处理一个数据服务从TCP套接字中接收到的数据。
一创建maven工程,引入相应依赖jar包
2.11.8repos Repository http://maven.aliyun.com/nexus/content/groups/public scala-tools.org Scala-Tools Maven2 Repository http://scala-tools.org/repo-releases repos Repository http://maven.aliyun.com/nexus/content/groups/public scala-tools.org Scala-Tools Maven2 Repository http://scala-tools.org/repo-releases org.apache.spark spark-core_2.112.4.0 providedorg.apache.spark spark-sql_2.112.4.0 providedorg.apache.spark spark-streaming_2.112.4.0 providedorg.scala-lang scala-library${scala.version} src/main/scala src/test/scala org.scala-tools maven-scala-plugin2.15.2 scala-compile-first compile **/*.scala scala-test-compile testCompile org.apache.maven.plugins maven-compiler-plugin3.1 1.8 maven-assembly-plugin1.8 false jar-with-dependencies org.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordsFrep make-assembly packagesingle org.apache.maven.plugins maven-jar-plugin2.4 true org.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordsFrep
二:Scala代码如下:
package org.jy.data.yh.bigdata.drools.scala.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}/** * Spark Streaming 数据流统计词频 */ object SparkStreamingWordsFrep { def main(args: Array[String]): Unit = { // Spark 配置项 val sparkConf = new SparkConf() .setAppName("SparkStreamingWordsFrep") .setMaster("spark://centoshadoop1:7077,centoshadoop2:7077") // 创建流式上下文 val sparkStreamContext = new StreamingContext(sparkConf,Seconds(2)) // 创建一个DStream,连接指定的hostname:port,比如localhost:9999 val lines = sparkStreamContext.socketTextStream("centoshadoop1",9999) // 将接收到的每条信息分割成次词语 val words = lines.flatMap(line =>{line.split(" ")}) // 统计每个batch的词频 val pairs = words.map(word =>(word,1)) // 汇总词频 val wordCounts = pairs.reduceByKey(_+_) // 将key相同的元组的value累积 // 打印从DStream中生成的RDD的前10个元素到控制台 wordCounts.print(10000) sparkStreamContext.start() // 开始计算 sparkStreamContext.awaitTermination() //等待计算结束 }}
三,linux系统安装nmap-ncat软件
yum install nc
四,打包到spark集群上运行,命令如下
bin/spark-submit \
--class org.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordsFrep \
--num-executors 4 \
--driver-memory 2G \
--executor-memory 1g \
--executor-cores 1 \
--conf spark.default.parallelism=1000 \
/home/hadoop/tools/SSO-Scala-SparkStreaming-1.0-SNAPSHOT.jar
五,打开两个linux界面,在其中一个界面输入要统计的文本内容
如下图:
文章图片
[hadoop@centoshadoop1 ~]$ nc -lk 9999
Spark streaming is an extension of the core Spark API
输出效果如下:
【Spark Streaming词频统计实例】
文章图片
推荐阅读
- 技术-大数据|转(Spark案例:Scala版统计单词个数)
- 大数据|Spark 之 RDD转换算子
- Spark基础学习笔记|Spark RDD案例(词频统计)
- spark|spark案例-词频统计(存储数据库)
- Spark,一个奇迹的诞生
- Spark入门简介
- 批处理框架|Spark学习笔记(3) - 关于Spark常用的transform算子的一些总结??????
- 算法|使用Spark完成基于TF-IDF特征的新闻热点聚类
- #|Spark性能调优实战(基础知识)-极客时间-吴磊