图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据

图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

作者:韩信子@ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/176
声明:版权所有,转载请联系平台与作者并注明出处
引言 2020以来新冠疫情改变了全世界,影响着大家的生活,本案例结合大数据分析技术,使用pyspark对2020年美国新冠肺炎疫情进行数据分析,并结合可视化方法进行结果呈现。
1.实验环境

  • (1)Linux: Ubuntu 16.04
  • (2)Hadoop3.1.3
  • (3)Python: 3.8
  • (4)Spark: 2.4.0
  • (5)Jupyter Notebook
2.数据集 1)数据集下载 本案例使用的数据集来自Kaggle平台的美国新冠肺炎疫情数据集,数据名称us-counties.csv,为csv文件,它包含了美国发现首例新冠肺炎确诊病例至2020-05-19的相关数据。
数据集下载(百度网盘)
链接:https://pan.baidu.com/s/1YNY2UREm5lXsNkHM3DZFmA
提取码:show
数据一览如下:
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

2)格式转换 原始数据为csv格式文件,我们首先做一点数据格式转换,方便spark读取数据生成RDD或者DataFrame,具体数据转换代码如下:
import pandas as pd #.csv->.txt data = https://www.it610.com/article/pd.read_csv('/home/hadoop/us-counties.csv') with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f: for line in data.values: f.write((str(line[0])+'\t'+str(line[1])+'\t' +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

3)数据上传至HDFS 然后上传“/home/hadoop/us-counties.txt”至HDFS文件系统中,具体路径为“/user/hadoop/us-counties.txt”。操作命令如下:
./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

3.使用Spark对数据进行分析 这里采用Python作为编程语言,结合pyspark进行数据分析。
1)数据读取与DataFrame构建 首先我们读取数据文件,生成Spark DataFrame。
本案例中使用的数据为结构化数据,因此可以使用spark读取源文件生成DataFrame以方便进行后续分析实现。
from pyspark import SparkConf,SparkContext from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql import SparkSession from datetime import datetime import pyspark.sql.functions as funcdef toDate(inputStr): newStr = "" if len(inputStr) == 8: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7] newStr = s1+"-"+"0"+s2+"-"+"0"+s3 else: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7:] newStr = s1+"-"+"0"+s2+"-"+s3 date = datetime.strptime(newStr, "%Y-%m-%d") return date#主程序: spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False), StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),] schema = StructType(fields)rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt") rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))shemaUsInfo = spark.createDataFrame(rdd1,schema)shemaUsInfo.createOrReplaceTempView("usInfo")

2)数据分析 本案例主要进行了以下统计分析,分析的目标和方法如下:
  • 获取数据集与代码 → ShowMeAI的官方GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler
(1)统计美国截止每日的累计确诊人数和累计死亡人数。
  • 以date作为分组字段,对cases和deaths字段进行汇总统计。
(2)统计美国每日的新增确诊人数。
  • 因为「新增数=今日数-昨日数」,这里使用自连接,连接条件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases计算该日新增。
(3)统计美国每日的新增确诊人数新增死亡人数。
  • 因为「新增数=今日数-昨日数」,这里使用自连接,连接条件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases计算该日新增。
(4)统计截止5.19日,美国各州的累计确诊人数和死亡人数。
  • 首先筛选出5.19日的数据,然后以state作为分组字段,对cases和deaths字段进行汇总统计。
(5)统计截止5.19日,美国确诊人数最多的十个州。
  • 对3)的结果DataFrame注册临时表,然后按确诊人数降序排列,并取前10个州。
(6)统计截止5.19日,美国死亡人数最多的十个州。
  • 对3)的结果DataFrame注册临时表,然后按死亡人数降序排列,并取前10个州。
(7)统计截止5.19日,美国确诊人数最少的十个州。
  • 对3)的结果DataFrame注册临时表,然后按确诊人数升序排列,并取前10个州。
(8)统计截止5.19日,美国死亡人数最少的十个州。
  • 对3)的结果DataFrame注册临时表,然后按死亡人数升序排列,并取前10个州。
(9)统计截止5.19日,全美和各州的病死率。
  • 病死率 = 死亡数/确诊数,对3)的结果DataFrame注册临时表,然后按公式计算。
我们下面基于Spark DataFrame和Spark sql进行统计分析。
# 1.计算每日的累计确诊病例数和死亡数 df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())# 列重命名 df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths") df1.repartition(1).write.json("result1.json")#写入hdfs# 注册为临时表供下一步使用 df1.createOrReplaceTempView("ustotal")# 2.计算每日较昨日的新增确诊病例数和死亡病例数 df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")#写入hdfs# 3.统计截止5.19日 美国各州的累计确诊人数和死亡人数 df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfowhere date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfsdf3.createOrReplaceTempView("eachStateInfo")# 4.找出美国确诊最多的10个州 df4 = spark.sql("select date,state,totalCases from eachStateInfoorder by totalCases desc limit 10") df4.repartition(1).write.json("result4.json")# 5.找出美国死亡最多的10个州 df5 = spark.sql("select date,state,totalDeaths from eachStateInfoorder by totalDeaths desc limit 10") df5.repartition(1).write.json("result5.json")# 6.找出美国确诊最少的10个州 df6 = spark.sql("select date,state,totalCases from eachStateInfoorder by totalCases asc limit 10") df6.repartition(1).write.json("result6.json")# 7.找出美国死亡最少的10个州 df7 = spark.sql("select date,state,totalDeaths from eachStateInfoorder by totalDeaths asc limit 10") df7.repartition(1).write.json("result7.json")# 8.统计截止5.19全美和各州的病死率 df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache() df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

3)结果文件 上述Spark计算结果保存.json文件,方便后续可视化处理。由于使用Python读取HDFS文件系统不太方便,故将HDFS上结果文件转储到本地文件系统中,使用以下命:
./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1 ...

对于result2等结果文件,使用相同命令,只需要改一下路径即可。下载过程如下图所示:
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

4.数据可视化 1)可视化工具选择与代码 选择使用python第三方库pyecharts作为可视化工具。
  • 获取数据集与代码 → ShowMeAI的官方GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler
在使用前,需要安装pyecharts,安装代码如下:
pip install pyecharts

具体可视化实现代码如下:
from pyecharts import options as opts from pyecharts.charts import Bar from pyecharts.charts import Line from pyecharts.components import Table from pyecharts.charts import WordCloud from pyecharts.charts import Pie from pyecharts.charts import Funnel from pyecharts.charts import Scatter from pyecharts.charts import PictorialBar from pyecharts.options import ComponentTitleOpts from pyecharts.globals import SymbolType import json

每日的累计确诊病例数和死亡数 → 双柱状图
#1.画出每日的累计确诊病例数和死亡数 → 双柱状图def drawChart_1(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" date = [] cases = [] deaths = [] with open(root, 'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) date.append(str(js['date'])) cases.append(int(js['cases'])) deaths.append(int(js['deaths']))d = ( Bar() .add_xaxis(date) .add_yaxis("累计确诊人数", cases, stack="stack1") .add_yaxis("累计死亡人数", deaths, stack="stack1") .set_series_opts(label_opts=opts.LabelOpts(is_show=False)) .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数")) .render("/home/hadoop/result/result1/result1.html") )

每日的新增确诊病例数和死亡数 → 折线图
#2.画出每日的新增确诊病例数和死亡数 → 折线图 def drawChart_2(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" date = [] cases = [] deaths = [] with open(root, 'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) date.append(str(js['date'])) cases.append(int(js['caseIncrease'])) deaths.append(int(js['deathIncrease']))( Line(init_opts=opts.InitOpts(width="1600px", height="800px")) .add_xaxis(xaxis_data=https://www.it610.com/article/date) .add_yaxis( series_name="新增确诊", y_axis=cases, markpoint_opts=opts.MarkPointOpts( data=https://www.it610.com/article/[ opts.MarkPointItem(type_="max", name="最大值")] ), markline_opts=opts.MarkLineOpts( data=https://www.it610.com/article/[opts.MarkLineItem(type_="average", name="平均值")] ), ) .set_global_opts( title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""), tooltip_opts=opts.TooltipOpts(trigger="axis"), toolbox_opts=opts.ToolboxOpts(is_show=True), xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False), ) .render("/home/hadoop/result/result2/result1.html") ) ( Line(init_opts=opts.InitOpts(width="1600px", height="800px")) .add_xaxis(xaxis_data=https://www.it610.com/article/date) .add_yaxis( series_name="新增死亡", y_axis=deaths, markpoint_opts=opts.MarkPointOpts( data=https://www.it610.com/article/[opts.MarkPointItem(type_="max", name="最大值")] ), markline_opts=opts.MarkLineOpts( data=https://www.it610.com/article/[ opts.MarkLineItem(type_="average", name="平均值"), opts.MarkLineItem(symbol="none", x="90%", y="max"), opts.MarkLineItem(symbol="circle", type_="max", name="最高点"), ] ), ) .set_global_opts( title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""), tooltip_opts=opts.TooltipOpts(trigger="axis"), toolbox_opts=opts.ToolboxOpts(is_show=True), xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False), ) .render("/home/hadoop/result/result2/result2.html") )

截止5.19,美国各州累计确诊、死亡人数和病死率—->表格
#3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格 def drawChart_3(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" allState = [] with open(root, 'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) row = [] row.append(str(js['state'])) row.append(int(js['totalCases'])) row.append(int(js['totalDeaths'])) row.append(float(js['deathRate'])) allState.append(row)table = Table()headers = ["State name", "Total cases", "Total deaths", "Death rate"] rows = allState table.add(headers, rows) table.set_global_opts( title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="") ) table.render("/home/hadoop/result/result3/result1.html")

美国确诊最多的10个州 → 词云图
#4.画出美国确诊最多的10个州 → 词云图 def drawChart_4(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" data = https://www.it610.com/article/[] with open(root,'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) row=(str(js['state']),int(js['totalCases'])) data.append(row)c = ( WordCloud() .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND) .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10")) .render("/home/hadoop/result/result4/result1.html") )

美国死亡最多的10个州 → 柱状图
#5.画出美国死亡最多的10个州 → 柱状图 def drawChart_5(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" state = [] totalDeath = [] with open(root, 'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) state.insert(0,str(js['state'])) totalDeath.insert(0,int(js['totalDeaths']))c = ( PictorialBar() .add_xaxis(state) .add_yaxis( "", totalDeath, label_opts=opts.LabelOpts(is_show=False), symbol_size=18, symbol_repeat="fixed", symbol_offset=[0, 0], is_symbol_clip=True, symbol=SymbolType.ROUND_RECT, ) .reversal_axis() .set_global_opts( title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"), xaxis_opts=opts.AxisOpts(is_show=False), yaxis_opts=opts.AxisOpts( axistick_opts=opts.AxisTickOpts(is_show=False), axisline_opts=opts.AxisLineOpts( linestyle_opts=opts.LineStyleOpts(opacity=0) ), ), ) .render("/home/hadoop/result/result5/result1.html") )

找出美国确诊最少的10个州 → 词云图
#6.找出美国确诊最少的10个州 → 词云图 def drawChart_6(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" data = https://www.it610.com/article/[] with open(root,'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) row=(str(js['state']),int(js['totalCases'])) data.append(row)c = ( WordCloud() .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND) .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州")) .render("/home/hadoop/result/result6/result1.html") )

美国死亡最少的10个州 → 漏斗图
#7.找出美国死亡最少的10个州 → 漏斗图 def drawChart_7(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" data = https://www.it610.com/article/[] with open(root,'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) data.insert(0,[str(js['state']),int(js['totalDeaths'])])c = ( Funnel() .add( "State", data, sort_="ascending", label_opts=opts.LabelOpts(position="inside"), ) .set_global_opts(title_opts=opts.TitleOpts(title="")) .render("/home/hadoop/result/result7/result1.html") )

美国的病死率—->饼状图
#8.美国的病死率--->饼状图 def drawChart_8(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.json" values = [] with open(root, 'r') as f: while True: line = f.readline() if not line:# 到 EOF,返回空字符串,则终止循环 break js = json.loads(line) if str(js['state'])=="USA": values.append(["Death(%)",round(float(js['deathRate'])*100,2)]) values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)]) c = ( Pie() .add("", values) .set_colors(["blcak","orange"]) .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率")) .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")) .render("/home/hadoop/result/result8/result1.html") )#可视化 index = 1 while index<9: funcStr = "drawChart_" + str(index) eval(funcStr)(index) index+=1

2)结果图标展示 可视化结果是.html格式的,reslut1的结果展示图保存路径为“/home/hadoop/result/result1/result1.html”,reslut2的结果展示图保存路径为“/home/hadoop/result/result2/result1.html”,其余类似递推。具体截图如下:
(1)美国每日的累计确诊病例数和死亡数 → 双柱状图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(2)美国每日的新增确诊病例数 → 折线图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(3)美国每日的新增死亡病例数 → 折线图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(4)截止5.19,美国各州累计确诊、死亡人数和病死率 → 表格
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(5)截止5.19,美国累计确诊人数前10的州 → 词云图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(6)截止5.19,美国累计死亡人数前10的州 → 柱状图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(7)截止5.19,美国累计确诊人数最少的10个州 → 词云图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(8)截止5.19,美国累计死亡人数最少的10个州 → 漏斗图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

(9)截止5.19,美国的病死率 → 饼状图
图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

5.参考资料
  • 数据科学工具速查 | Spark使用指南(RDD版) http://www.showmeai.tech/article-detail/106
  • 数据科学工具速查 | Spark使用指南(SQL版) http://www.showmeai.tech/article-detail/107
ShowMeAI相关文章推荐
  • 图解大数据 | 导论:大数据生态与应用
  • 图解大数据 | 分布式平台:Hadoop与Map-reduce详解
  • 图解大数据 | 实操案例:Hadoop系统搭建与环境配置
  • 图解大数据 | 实操案例:应用map-reduce进行大数据统计
  • 图解大数据 | 实操案例:Hive搭建与应用案例
  • 图解大数据 | 海量数据库与查询:Hive与HBase详解
  • 图解大数据 | 大数据分析挖掘框架:Spark初步
  • 图解大数据 | Spark操作:基于RDD的大数据处理分析
  • 图解大数据 | Spark操作:基于Dataframe与SQL的大数据处理分析
  • 图解大数据 | 综合案例:使用spark分析美国新冠肺炎疫情数据
  • 图解大数据 | 综合案例:使用Spark分析挖掘零售交易数据
  • 图解大数据 | 综合案例:使用Spark分析挖掘音乐专辑数据
  • 图解大数据 | 流式数据处理:Spark Streaming
  • 图解大数据 | Spark机器学习(上)-工作流与特征工程
  • 图解大数据 | Spark机器学习(下)-建模与超参调优
  • 图解大数据 | Spark GraphFrames:基于图的数据分析挖掘
ShowMeAI系列教程推荐
  • 图解Python编程:从入门到精通系列教程
  • 图解数据分析:从入门到精通系列教程
  • 图解AI数学基础:从入门到精通系列教程
  • 图解大数据技术:从入门到精通系列教程
【图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据】图解大数据|图解大数据 | 综合案例-使用spark分析新冠肺炎疫情数据
文章图片

    推荐阅读