Spark Streaming之流式词频统计(Socket数据源)

一、环境
Spark、Hadoop环境搭建可参看之前文章。

开发环境: 系统:Win10 开发工具:scala-eclipse-IDE 项目管理工具:Maven 3.6.0 JDK 1.8 Scala 2.11.11 Spark 2.4.3Spark运行环境: 系统:Linux CentOS7(两台机:主从节点) master : 192.168.190.200 slave1 : 192.168.190.201 JDK 1.8 Hadoop 2.9.2 Scala 2.11.11 Spark 2.4.3

二、案例简介
1. 以Socket连接作为SparkStreaming作业数据源,进行词频统计(统计间隔:1s)。
2. Socket文本数据流是通过TCP套接字连接接收文本数据产生DStream。
三、代码(Maven项目:wordFreqSocket)
Spark Streaming之流式词频统计(Socket数据源)
文章图片

1. 配置 pom.xml:
4.0.0 com wordFreqSocket 0.1 org.apache.spark spark-core_2.11 2.4.3 provided org.apache.spark spark-streaming_2.11 2.4.3 provided log4j log4j 1.2.17 org.slf4j slf4j-log4j12 1.7.12 org.scala-tools maven-scala-plugin compile compile 【Spark Streaming之流式词频统计(Socket数据源)】compile test-compile testCompile test-compile process-resources compile maven-compiler-plugin 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin 2.4 jar-with-dependencies assemble-allpackage single org.apache.maven.plugins maven-jar-plugin truesparkstreaming_action.socket.main.Socket alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ true false

2. 主程序:
package sparkstreaming_action.socket.mainimport org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds//创建一个集群模式的StreamingContext,两个工作线程,1s的批处理时间 //Master要求2个核,以防出现饥饿情况 object Socket { def main(args: Array[String]){ //Spark配置项 val conf = new SparkConf() .setAppName("SocketWordFreq") .setMaster("spark://master:7077") //创建流式上下文,1s的批处理间隔 val ssc = new StreamingContext(conf, Seconds(1)) //创建一个DStream,连接指定的hostname:port,比如master:9999 val lines = ssc.socketTextStream("master", 9999) //将接收到的每条信息分割成单个词汇 val words = lines.flatMap(_.split(" ")) //统计每个batch的词频 val pairs = words.map(word => (word, 1)) /* * 汇总词汇 * 注:reducebyKey(reduceFunc: (V, V) => V)中的函数 * 当V为单值结构时,第一个V为当前行value,第二个V为下一行Value,故"_ + _"代表前后两行Value相加(同一Key) * 当V为多值结构时,如:(1,1),可用"(x,y) => (x._1 + y._1, x._2 + y._2)"表示 */ val wordCounts = pairs.reduceByKey(_ + _) //打印从DStream中生成的RDD的前10个元素到控制台中 wordCounts.print()//print() 是输出操作,默认10条数据 ssc.start()//开始计算 ssc.awaitTermination()//等待计算结束 } }

四、安装Netcat
1.Netcat是网络工具中的瑞士军刀,它能通过TCP和UDP在网络中读写数据。 2.Netcat所做的就是在两台电脑之间建立链接并返回两个数据流。 3.能建立一个服务器,传输文件,与朋友聊天,传输流媒体或者用它作为其它协议的独立客户端。安装Netcat: $ yum install -y nc测试两个节点间聊天: master节点上执行命令:(建立服务器,端口号任意,只要不冲突) $ nc -lk 9999 注:表示在 9999 端口启动了一个tcp服务器,所有的标准输出和输入会输出到该端口。slave1节点上执行命令:(建立客户端) $ nc master 9999之后,任何一方在终端输入数据(回车)后,都会在另一方终端显示。注:如果只有一台机(如:master),可以打开另一个终端窗口通过ssh连接master节点; 之后,两个终端也能实现聊天。


五、打包运行
1.在项目的根目录下运行命令行窗口(在目录下 "shift+右键",选择命令行窗口 Power Shell) 执行如下命令:(编译代码) > mvn clean install 编译成功后,会在当前目录的 ".\target\" 下产生两个jar包; 其中的 wordFreqSocket-0.1-jar-with-dependencies.jar 用来提交给Spaek集群2.master节点建立Socket服务器(9999端口) $ nc -lk 9999**用另一终端(如:Windows的PowerShell)通过ssh登陆master节点,向下执行 3.将Jar包提交至主节点上,执行Spark作业: 提交Spark作业:(需先配置Spark_HOME环境变量) $ spark-submit \ --class sparkstreaming_action.socket.main.Socket \ /opt/wordFreqSocket-0.1-jar-with-dependencies.jar 注1:其中每行的末尾 "\" 代表不换行,命令需在一行上输入,此处只为方便观看 注2:提交的Jar包放在 /opt/ 目录下Spark流式作业运行过程的输出(以1s间隔,不停打印时间信息) ----------------------------- Time: 1559735412000 ms ---------------------------------------------------------- Time: 1559735413000 ms ---------------------------------------------------------- Time: 1559735414000 ms -----------------------------

当在master节点的 $ nc -lk 9999 命令下方输入数据,如下截图:
流式作业每隔1s会收集Socket数据,将词频统计结果输出前10条。(统计速度非常快,ms级别)
Spark Streaming之流式词频统计(Socket数据源)
文章图片

Spark Streaming之流式词频统计(Socket数据源)
文章图片

当断开Socket服务器时,流式作业会报如下错误;重新建立服务器后,又能恢复正常。
Spark Streaming之流式词频统计(Socket数据源)
文章图片

六、参考文章
1.《Spark Streaming 实时流式大数据处理实战》
2. groupByKey和reduceByKey
3. linux netcat命令使用技巧
4. Linux nc命令

    推荐阅读