本文概述
- 为Twitter API创建自己的凭证
- 构建Twitter HTTP客户端
- 设置我们的Apache Spark流应用程序
- 创建一个简单的实时仪表板来表示数据
- 一起运行应用程序
- Apache流式现实生活用例
当今的主要数据来源之一是社交网络。请允许我演示一个真实的示例:使用其中最重要的大数据回声解决方案之一(Apache Spark和Python)实时处理, 分析和从社交网络数据中提取见解。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582297323430-bd887d1172177bee9caf74bbd1009b0b.png)
文章图片
在本文中, 我将教你如何构建一个简单的应用程序, 该应用程序使用Python从Twitter读取在线流, 然后使用Apache Spark Streaming处理推文以标识主题标签, 最后返回最热门的主题标签并以真实的形式表示此数据。时间仪表板。
为Twitter API创建自己的凭证 为了从Twitter获得推文, 你需要在TwitterApps上注册, 方法是单击” 创建新应用程序” , 然后填写以下表单, 然后单击” 创建你的Twitter应用程序” 。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582222522755-2f6687cf78dabaeddc5844f0057162e0.png)
文章图片
其次, 转到你新创建的应用程序, 然后打开” 密钥和访问令牌” 标签。然后点击” 生成我的访问令牌” 。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582222584289-a61cdf883f5e4a7fd5a0f1531b7d975d.png)
文章图片
你的新访问令牌将显示如下。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582222640513-d293f45956dc402a116d951309262f95.png)
文章图片
现在, 你可以进行下一步了。
构建Twitter HTTP客户端 在这一步中, 我将向你展示如何构建一个简单的客户端, 该客户端将使用Python从Twitter API中获取推文, 并将其传递给Spark Streaming实例。对于任何专业的Python开发人员而言, 应该都易于遵循。
首先, 我们创建一个名为twitter_app.py的文件, 然后将代码添加到其中, 如下所示。
导入我们将要使用的库, 如下所示:
import socket
import sys
import requests
import requests_oauthlib
import json
并添加将在OAuth中用于连接到Twitter的变量, 如下所示:
# Replace the values below with yours
ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
ACCESS_SECRET = 'YOUR_ACCESS_SECRET'
CONSUMER_KEY = 'YOUR_CONSUMER_KEY'
CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET)
现在, 我们将创建一个名为get_tweets的新函数, 该函数将调用Twitter API URL并返回对tweet流的响应。
def get_tweets():
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
query_data = http://www.srcmini.com/[('language', 'en'), ('locations', '-130, -20, 100, 50'), ('track', '#')]
query_url = url + '?' + '&
'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
response = requests.get(query_url, auth=my_auth, stream=True)
print(query_url, response)
return response
然后, 创建一个函数, 以接收上述响应, 并从整个tweets的JSON对象中提取tweets的文本。之后, 它通过TCP连接将每条推文发送到Spark Streaming实例(稍后将讨论)。
def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
full_tweet = json.loads(line)
tweet_text = full_tweet['text']
print("Tweet Text: " + tweet_text)
print ("------------------------------------------")
tcp_connection.send(tweet_text + '\n')
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
现在, 我们将成为主要部分, 它将与Spark进行连接的应用程序主机套接字连接。我们将在这里将IP配置为localhost, 因为所有IP都将在同一台计算机和端口9009上运行。然后, 我们将调用上面所获得的get_tweets方法, 以从Twitter获取推文, 并将其响应与与send_tweets_to_spark的套接字连接, 用于将推文发送到Spark。
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)
设置我们的Apache Spark流应用程序 让我们构建一个Spark流媒体应用程序, 该应用程序将对传入的推文进行实时处理, 从它们中提取主题标签, 并计算已提及的主题标签数。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582222692844-bfd251400319962c71d58f464e086281.png)
文章图片
首先, 我们必须创建一个Spark Context sc实例, 然后以2秒的批处理间隔从sc创建一个Streaming Context ssc, 它将对每两秒钟接收到的所有流进行转换。请注意, 我们已将日志级别设置为ERROR, 以禁用Spark写入的大多数日志。
我们在此处定义了一个检查点, 以便允许定期的RDD检查点;必须在我们的应用程序中使用它, 因为我们将使用有状态转换(将在同一部分稍后讨论)。
然后, 我们定义主DStream数据流, 该数据流将连接到之前在端口9009上创建的套接字服务器, 并从该端口读取推文。 DStream中的每个记录将是一条推文。
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
import sys
import requests
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 2)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 9009
dataStream = ssc.socketTextStream("localhost", 9009)
现在, 我们将定义转换逻辑。首先, 我们将所有tweet分解为单词, 然后将它们放入RDD中。然后, 我们将仅过滤所有单词中的主题标签, 并将它们映射到(主题标签, 1)对, 并将其放入主题标签RDD中。
然后, 我们需要计算#次提到主题标签的次数。我们可以通过使用reduceByKey函数来实现。此函数将计算每个批次中提及标签#的次数, 即它将重置每个批次中的计数。
在我们的情况下, 我们需要计算所有批次的计数, 因此我们将使用另一个名为updateStateByKey的函数, 因为该函数使你可以在用新数据更新RDD时保持其状态。这种方式称为状态转换。
请注意, 要使用updateStateByKey, 你必须配置一个检查点, 并且我们在上一步中已完成此操作。
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag, 1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()
updateStateByKey将函数作为参数称为更新函数。它在RDD中的每个项目上运行, 并执行所需的逻辑。
在我们的案例中, 我们创建了一个称为aggregate_tags_count的更新函数, 该函数将对每个主题标签的所有new_values求和, 并将它们添加到total_sum(即所有批次的总和)中, 并将数据保存到tags_totals RDD中。
def aggregate_tags_count(new_values, total_sum):
return sum(new_values) + (total_sum or 0)
然后, 我们对每个批次中的tags_totals RDD进行处理, 以便使用Spark SQL Context将其转换为temp表, 然后执行select语句, 以检索前十个具有其计数的hashtag, 并将其放入hashtag_counts_df数据帧中。
def get_sql_context_instance(spark_context):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
print("----------- %s -----------" % str(time))
try:
# Get spark sql singleton context from the current context
sql_context = get_sql_context_instance(rdd.context)
# convert the RDD to Row RDD
row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
# create a DF from the Row RDD
hashtags_df = sql_context.createDataFrame(row_rdd)
# Register the dataframe as table
hashtags_df.registerTempTable("hashtags")
# get the top 10 hashtags from the table using SQL and print them
hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
hashtag_counts_df.show()
# call this method to prepare top 10 hashtags DF and send them
send_df_to_dashboard(hashtag_counts_df)
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
我们的Spark应用程序的最后一步是将hashtag_counts_df数据帧发送到仪表板应用程序。因此, 我们将数据框转换为两个数组, 一个数组用于主题标签, 另一个数组用于其计数。然后, 我们将它们通过REST API发送到仪表板应用程序。
def send_df_to_dashboard(df):
# extract the hashtags from dataframe and convert them into array
top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
# extract the counts from dataframe and convert them into array
tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
# initialize and send the data through REST API
url = 'http://localhost:5001/updateData'
request_data = http://www.srcmini.com/{'label': str(top_tags), 'data': str(tags_count)}
response = requests.post(url, data=http://www.srcmini.com/request_data)
最后, 这是运行和打印hashtag_counts_df时Spark Streaming的示例输出, 你会注意到, 按照批处理间隔每两秒精确打印一次输出。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582222757769-88892bd0778853f805dc0477909d65f6.png)
文章图片
创建一个简单的实时仪表板来表示数据 现在, 我们将创建一个简单的仪表板应用程序, 该应用程序将由Spark实时更新。我们将使用Python, Flask和Charts.js进行构建。
首先, 让我们创建一个具有以下结构的Python项目, 然后下载Chart.js文件并将其添加到静态目录中。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582222810998-ba4ca40762f311dde1c81080914ec3bd.png)
文章图片
然后, 在app.py文件中, 我们将创建一个名为update_data的函数, Spark将通过URL http:// localhost:5001 / updateData调用该函数, 以更新全局标签和值数组。
另外, 创建了refresh_graph_data函数以供AJAX请求调用, 以将新的更新标签和值数组作为JSON返回。函数get_chart_page在调用时将呈现chart.html页面。
from flask import Flask, jsonify, request
from flask import render_template
import ast
app = Flask(__name__)
labels = []
values = []
@app.route("/")
def get_chart_page():
global labels, values
labels = []
values = []
return render_template('chart.html', values=values, labels=labels)
@app.route('/refreshData')
def refresh_graph_data():
global labels, values
print("labels now: " + str(labels))
print("data now: " + str(values))
return jsonify(sLabel=labels, sData=http://www.srcmini.com/values)
@app.route('/updateData', methods=['POST'])
def update_data():
global labels, values
if not request.form or 'data' not in request.form:
return "error", 400
labels = ast.literal_eval(request.form['label'])
values = ast.literal_eval(request.form['data'])
print("labels received: " + str(labels))
print("data received: " + str(values))
return "success", 201
if __name__ == "__main__":
app.run(host='localhost', port=5001)
现在, 让我们在chart.html文件中创建一个简单的图表, 以显示主题标签数据并实时更新。如下定义, 我们需要导入Chart.js和jquery.min.js JavaScript库。
在body标记中, 我们必须创建一个画布并为其指定ID, 以便在下一步使用JavaScript显示图表时引用它。
<
!DOCTYPE html>
<
html>
<
head>
<
meta charset="utf-8"/>
<
title>
Top Trending Twitter Hashtags<
/title>
<
script src='http://www.srcmini.com/static/Chart.js'>
<
/script>
<
script src="http://www.srcmini.com//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js">
<
/script>
<
/head>
<
body>
<
h2>
Top Trending Twitter Hashtags<
/h2>
<
div style="width:700px;
height=500px">
<
canvas id="chart">
<
/canvas>
<
/div>
<
/body>
<
/html>
现在, 让我们使用下面的JavaScript代码来构建图表。首先, 我们获取canvas元素, 然后创建一个新的图表对象, 并将canvas元素传递给它, 然后定义其数据对象, 如下所示。
请注意, 数据的标签和数据受标签和值变量限制, 这些标签和值变量在调用app.py文件中的get_chart_page函数时呈现页面时返回。
最后剩下的部分是配置为每秒执行一次Ajax请求并调用URL / refreshData的函数, 该函数将在app.py中执行refresh_graph_data并返回新的更新数据, 然后更新呈现新数据的char。
<
script>
var ctx = document.getElementById("chart");
var myChart = new Chart(ctx, {
type: 'horizontalBar', data: {
labels: [{% for item in labels %}
"{{item}}", {% endfor %}], datasets: [{
label: '# of Mentions', data: [{% for item in values %}
{{item}}, {% endfor %}], backgroundColor: [
'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)'
], borderColor: [
'rgba(255, 99, 132, 1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255, 99, 132, 1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)'
], borderWidth: 1
}]
}, options: {
scales: {
yAxes: [{
ticks: {
beginAtZero:true
}
}]
}
}
});
var src_Labels = [];
var src_Data = http://www.srcmini.com/[];
setInterval(function(){
$.getJSON('/refreshData', {
}, function(data) {
src_Labels = data.sLabel;
src_Data = http://www.srcmini.com/data.sData;
});
myChart.data.labels = src_Labels;
myChart.data.datasets[0].data = src_Data;
myChart.update();
}, 1000);
<
/script>
一起运行应用程序 让我们按以下顺序运行这三个应用程序:1. Twitter App Client。 2. Spark应用程序。 3.仪表板Web应用程序。
然后, 你可以使用URL < http:// localhost:5001 /> 访问实时仪表板
现在, 你可以看到图表正在更新, 如下所示:
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/toptal-blog-image-1495624828402-c5fe26ecadd12b79ac2ed7b82fdbe287.gif)
文章图片
Apache流式现实生活用例 我们已经学习了如何使用Spark Streaming实时对数据进行简单的数据分析, 以及如何使用RESTful网络服务将其直接与简单的仪表板集成。从这个示例中, 我们可以看到Spark的强大功能, 因为它捕获了大量的数据流, 对其进行转换, 并提取出有价值的见解, 这些见解可轻松用于立即做出决策。可以实施许多有用的用例, 这些用例可以服务于不同行业, 例如新闻或营销。
![Apache Spark流教程(识别流行的Twitter Hashtags)](http://www.srcmini.com/wp-content/uploads/2020/04/image-1582222907886-364c43993f885d2c443f22b82e9c3f21.png)
文章图片
新闻行业的例子
我们可以跟踪最常提及的主题标签, 以了解人们在社交媒体上谈论最多的话题。此外, 我们可以跟踪特定的主题标签及其推文, 以了解人们对世界上特定主题或事件的评价。营销实例
我们可以收集推文流, 并通过情感分析将其分类并确定人们的兴趣, 以便针对他们的兴趣提供相关的要约。此外, 有很多用例可以专门用于大数据分析, 并且可以服务于许多行业。一般而言, 有关更多Apache Spark用例, 建议你查看我们之前的文章之一。
【Apache Spark流教程(识别流行的Twitter Hashtags)】我鼓励你从此处阅读有关Spark Streaming的更多信息, 以便更多地了解它的功能, 并对数据进行更高级的转换, 以便实时使用它获得更多见解。
推荐阅读
- SQL Server 2016始终加密(易于实现,难以破解)
- SRVB密码系统入门
- 微服务入门(Dropwizard教程)
- 如何修复Windows更新错误0x8007000d(解决办法介绍)
- 如何修复网络连接错误0x00028002(解决办法分步教程)
- 26款最佳3D建模软件下载推荐合集(哪个最好用())
- 15个最佳免费Windows 10主题推荐下载合集(哪个最好看())
- 19款Windows最佳免费文件管理器下载推荐合集(哪个最好用())
- 如何修复Windows 10上参数不正确的错误(解决办法介绍)