使用R构建数据管道

本文概述

  • 步骤1:创建数据库和表以存储Twitter数据
  • 第2步:串流有关你喜欢的主题的推文!
  • 步骤3进行分析
  • 总结
  • 参考文献
有人可能会争辩说, 正确的ETL管道是数据科学的重要组成部分。如果没有整洁有序的数据, 就很难产生可增强业务决策的高质量见解。
因此, 在本教程中, 我们将探讨构建简单的ETL管道以使用R将实时Tweets直接流式传输到SQLite数据库中的必要条件。例如, 这是社交网络分析中相当常见的任务。
重点将放在涵盖数据收集和存储的思考过程, 以及如何使用R中的rtweet包操作Twitter API。
首先, 让我们从拥有所有合适的工具开始。你需要做的第一件事是设置对Twitter API的访问权限。总体而言, 你需要执行以下步骤:
  • 如果你没有Twitter帐户, 请创建一个。
  • 单击此链接并申请开发者帐户(请注意, 此过程现在需要Twitter批准的应用程序)。
  • 在以下网页$ ^ 1 $上创建一个新应用。
  • 填写有关你的应用程序的所有详细信息, 并创建你的访问令牌$ ^ 1 $。
  • 收集你需要连接到API的使用者密钥, 使用者密钥, 访问令牌和访问令牌密钥。
设置完Twitter API后, 如果没有, 则需要获取SQLite。有关如何在计算机上获取SQLite的完整过程, 请遵循srcmini上的SQLite入门指南教程。由于操作简单, 因此选择使用SQLite作为本教程。
步骤1:创建数据库和表以存储Twitter数据 通过安装Twitter API访问和SQLite, 我们最终可以开始构建管道以存储Tweet, 因为我们会随着时间流化它们。首先, 我们将使用R创建新的SQLite数据库, 如下所示:
# Import necessary libraries and functions library(RSQLite) library(rtweet) library(tm) library(dplyr) library(knitr) library(wordcloud) library(lubridate) library(ggplot2) source("transform_and_clean_tweets.R")

# Create our SQLite database conn < - dbConnect(RSQLite::SQLite(), "Tweet_DB.db")

之后, 我们可以在数据库内部创建一个表来保存推文。在我们的特定情况下, 我们将存储以下变量:
  • Tweet_ID作为INTEGER主键
  • 使用者为TEXT
  • Tweet_Content为TEXT
  • Date_Created为INTEGER
你可能想知道, 为什么我将日期设置为整数?这是因为SQLite没有保留日期和时间的数据类型。因此, 日期将被存储为自1970-01-01开始的秒数。
现在让我们继续写表:
dbExecute(conn, "CREATE TABLE Tweet_Data( Tweet_ID INTEGER PRIMARY KEY, User TEXT, Tweet_Content TEXT, Date_Created INTEGER)")

创建表后, 可以转到sqlite3.exe并检查是否确实已创建。你可以在下面看到此屏幕截图:
使用R构建数据管道

文章图片
第2步:串流有关你喜欢的主题的推文! 信不信由你, 你已经建立了简单, 功能正常的Twitter流传输管道所需的要求和基础结构已经完成。我们现在需要做的是使用API??流推文。值得注意的是, 出于本教程的目的, 我将使用标准的免费API。例如, 如果你进行研究, 有一些付费版本可能更适合你的Tweet流媒体需求。
不要再拖延了, 让我们开始设置Twitter侦听器的过程。你需要做的第一件事是导入rtweet程序包, 并按照开头所述输入应用程序的访问令牌和机密:
token < - create_token(app = 'Your_App_Name', consumer_key = 'Your_Consumer_Key', consumer_secret = 'Your_Consumer_Secret', access_token = 'Your_Access_Token', access_secret = 'Your_Access_Secret')

放置好令牌后, 下一步是确定你要流式传输(即收听)哪些推文。 rtweet包中的stream_tweets函数为我们提供了查询Twitter API的各种选项。例如, 你可以流式传输包含一组给定的标签或关键字(最多400个)中的一个或多个, 所有公开可用推文的一小部分随机子集的推文, 跟踪一组用户ID或屏幕(用户)的推文。名称(最多5000个), 或按地理位置收集推文。
对于本教程, 我决定继续进行流式处理, 其中包含与数据科学相关的标签的推文(请参见下面的列表)。你可能会注意到, 我定义要推送的主题标签的格式有些奇怪。但是, 当你要收听主题标签或关键字时, 这是stream_tweets函数所需的格式。如果你打算听一组给定的用户或基于坐标, 则此格式会有所不同。有关更多详细信息, 请参阅文档。
keys < - "#nlp, #machinelearning, #datascience, #chatbots, #naturallanguageprocessing, #deeplearning"

定义了关键字之后, 就该定义推文流循环了。有几种方法可以做到这一点, 但是这种格式过去对我来说效果很好:
# Initialize the streaming hour tally hour_counter < - 0# Initialize a while loop that stops when the number of hours you want to stream tweets for is exceeded while(hour_counter < = 12){ # Set the stream time to 2 hours each iteration (7200 seconds) streamtime < - 7200 # Create the file name where the 2 hour stream will be stored. Note that the Twitter API outputs a .json file. filename < - paste0("nlp_stream_", format(Sys.time(), '%d_%m_%Y__%H_%M_%S'), ".json") # Stream Tweets containing the desired keys for the specified amount of time stream_tweets(q = keys, timeout = streamtime, file_name = filename) # Clean the streamed tweets and select the desired fields clean_stream < - transform_and_clean_tweets(filename, remove_rts = TRUE) # Append the streamed tweets to the Tweet_Data table in the SQLite database dbWriteTable(conn, "Tweet_Data", clean_stream, append = T) # Delete the .json file from this 2-hour stream file.remove(filename) # Add the hours to the tally hour_counter < - hour_counter + 2 }

本质上, 该循环以2小时的间隔流式传输尽可能多的tweet, 以提及键字符串中的任何主题标签, 总时间为12小时。每2个小时, Twitter侦听器就会在你当前的工作目录中创建一个.json文件, 其名称在变量filename中指定。
然后, 它将这个文件名传递给transform_and_clean_tweets函数, 该函数会在需要时删除转推, 从Twitter API提供的所有列中选择要保留的列, 并对Tweets中包含的文本进行规范化。
然后, 它将结果数据帧附加到我们之前在SQLite数据库中创建的Tweet_Data表。最后, 它将2加到小时计数表中(因为流持续2个小时), 并删除了创建的.json文件。这样做是因为所有感兴趣的数据现在都在我们的数据库中, 并且保留.json文件可能会成为存储负担。
让我们知道详细了解一下transform_and_clean_tweets函数:
transform_and_clean_tweets < - function(filename, remove_rts = TRUE){# Import the normalize_text function source("normalize_text.R")# Parse the .json file given by the Twitter API into an R data frame df < - parse_stream(filename) # If remove_rst = TRUE, filter out all the retweets from the stream if(remove_rts == TRUE){ df < - filter(df, df$is_retweet == FALSE) } # Keep only the tweets that are in English df < - filter(df, df$lang == "en") # Select the features that you want to keep from the Twitter stream and rename them # so the names match those of the columns in the Tweet_Data table in our database small_df < - df[, c("screen_name", "text", "created_at")] names(small_df) < - c("User", "Tweet_Content", "Date_Created") # Finally normalize the tweet text small_df$Tweet_Content < - sapply(small_df$Tweet_Content, normalize_text) # Return the processed data frame return(small_df) }

如前所述, 此功能旨在过滤转发消息(如果需要), 保留所需功能并规范化推文文本。从本质上讲, 这可以视为ETL缩写词(转换)的” T” 部分。此功能的关键组成部分是清除推文的过程。
通常, 文本数据需要一些预处理步骤, 才能准备好进行任何类型的分析。对于推文, 这些步骤可以包括删除URL, 停用词和提及, 将文本变为小写字母, 词干等。但是, 并非所有这些步骤始终都是必需的。不过, 暂时, 我将在下面显示用于预处理这些推文的normalize_text函数:
normalize_text < - function(text){ # Keep only ASCII characters text = iconv(text, "latin1", "ASCII", sub="") # Convert to lower case characters text = tolower(text) # Remove any HTML tags text = gsub("< .*?> ", " ", text) # Remove URLs text = gsub("\\s?(f|ht)(tp)(s?)(://)([^\\.]*)[\\.|/](\\S*)", "", text) # Keep letters and numbers only text = gsub("[^[:alnum:]]", " ", text) # Remove stop words text = removeWords(text, c("rt", "gt", stopwords("en"))) # Remove any extra white space text = stripWhitespace(text) text = gsub("^\\s+|\\s+$", "", text)return(text) }

根据你的用例, 这些步骤可能就足够了。如前所述, 你可以选择添加更多或不同的步骤, 例如词干提取或词形修饰, 或仅保留字母而不是字母和数字。我鼓励你尝试并尝试不同的组合。这可能是练习正则表达式技能的好地方。
【使用R构建数据管道】完成所有这些步骤之后, 最终状态是一个SQLite数据库, 其中填充了所有流式推文。你可以通过运行几个简单的查询来验证一切正常, 例如:
data_test < - dbGetQuery(conn, "SELECT * FROM Tweet_Data LIMIT 20") unique_rows < - dbGetQuery(conn, "SELECT COUNT() AS Total FROM Tweet_Data") kable(data_test)

使用R构建数据管道

文章图片
print(as.numeric(unique_rows))

## [1] 1863

步骤3进行分析 当确定ETL流程可以按预期运行时, 最后一步是收集一些见解并分析你收集的数据。例如, 对于我们收集的推文, 我们可以尝试一些不同的事情。该推文内容中提到的术语词云, 以及一个时间表, 以可视化方式显示我们在12小时的流媒体播放期间何时获得最多的推文。显然, 这份清单并不是对可以使用推文进行的所有研究的全面概述, 其范围可以从情感分析到心理研究等等。
话虽如此, 让我们直接建立一个不错的wordcloud:
# Gather all tweets from the database all_tweets < - dbGetQuery(conn, "SELECT Tweet_ID, Tweet_Content FROM Tweet_Data")# Create a term-document matrix and sort the words by frequency dtm < - TermDocumentMatrix(VCorpus(VectorSource(all_tweets$Tweet_Content))) dtm_mat < - as.matrix(dtm) sorted < - sort(rowSums(dtm_mat), decreasing = TRUE) freq_df < - data.frame(words = names(sorted), freq = sorted)# Plot the wordcloud set.seed(42) wordcloud(words = freq_df$words, freq = freq_df$freq, min.freq = 10, max.words=50, random.order=FALSE, rot.per=0.15, colors=brewer.pal(8, "RdYlGn"))

使用R构建数据管道

文章图片
好吧, 这真是一个巨大的惊喜!机器学习和数据科学是我们所有推文中最常被提及的词-它们恰好是我们要为其流式传输的两个标签。因此, 可以预期该结果。不过, 换句话来说, 看起来有些有趣。例如, 大数据和人工智能并不是我们密钥中的主题标签, 但是它们出现的频率很高, 因此人们可以说它们经常与其他两个主题一起被谈论。我们还选择了其他有趣的词, 例如python或tensorflow, 这些词使我们在收集的tweet内容中有了更多的上下文, 而不仅仅是井号。
现在让我们转向另一个简单的分析。在我们的12小时流中, 我们什么时候收集了最多的推文?为此, 我们将收集整数日期, 将其转换为适当的格式, 然后绘制一段时间内的推文数量:
# Select the dates in which the tweets were created and convert them into UTC date-time format all_tweets < - dbGetQuery(conn, "SELECT Tweet_ID, Date_Created FROM Tweet_Data") all_tweets$Date_Created < - as.POSIXct(all_tweets$Date_Created, origin = "1970-01-01", tz = "UTC")# Group by the day and hour and count the number of tweets that occurred in each bucket all_tweets_2 < - all_tweets %> % mutate(day = day(Date_Created), month = month(Date_Created, label = TRUE), hour = hour(Date_Created)) %> % mutate(day_hour = paste(month, "-", day, "-", hour, sep = "")) %> % group_by(day_hour) %> % tally()# Simple line ggplot ggplot(all_tweets_2, aes(x = day_hour, y = n)) + geom_line(aes(group = 1)) + theme_minimal() + ggtitle("Tweet Freqeuncy During the 12-h Streming Period")+ ylab("Tweet Count")+ xlab("Month-Day-Hour")

使用R构建数据管道

文章图片
甜!现在我们可以看到, 我们获得的最大数量的独特推文来自UTC时间9月2日(UTC晚上8:00至8:59 PM)(在军事时间显示为20)。
总结 恭喜你!现在你知道了如何在R中构建简单的ETL管道。我们进行的两个分析代表了使用Twitter数据进行的非常基本的分析。但是, 如前所述, 只要构建一个健壮的管道来引入数据, 还有很多事情要做。这方面是本教程的主要重点。
话虽如此, 本教程仅显示了一个非常小的案例研究, 以逐步了解为Twitter数据构建ETL管道的过程。为整个企业建立健壮且可扩展的ETL管道是一项复杂的工作, 需要大量的计算资源和知识, 尤其是在涉及大数据时。
我鼓励你进行进一步的研究, 并尝试构建自己的小规模管道, 这可能涉及在Python $ ^ 1 $中构建一个管道。也许, 甚至继续尝试一些大数据项目。例如, srcmini已经通过PySpark课程开设了诸如” 大数据基础知识” 之类的课程, 在这些课程中, 他们使用诸如PySpark之类的工具来学习大数据, 从而可以进一步了解该领域。
如果你想了解数据工程, 请参加srcmini的数据工程入门课程。
参考文献
  1. Foley, D.(2019年5月11日)。将Twitter数据流式传输到MySQL数据库中。取自https://towardsdatascience.com/streaming-twitter-data-into-a-mysql-database-d62a02b050d6

    推荐阅读