千金一刻莫空度,老大无成空自伤。这篇文章主要讲述基于Spark+Grafana可视化电商项目实战,好文收藏~相关的知识,希望能为你提供帮助。
大家好,我是老兵。
本系列为大数据项目实战系列,每期内容将讲解??项目背景?
??、??技术架构?
??和核心??代码?
?部分,帮助相关小伙伴快速了解大数据项目与技术。
【基于Spark+Grafana可视化电商项目实战,好文收藏~】在上期的基于Spark GraphFrame社交网络实战项目中,介绍了Spark图计算与社交关系图谱,文章反响很好。?
本期将继续介绍基于Spark和Grafana的??电商零售分析?
?项目,在文末附有电商数据集下载地址,欢迎大家自行领取。话不多说,我们开始。
1 项目介绍互联网背景下的大数据、AI领域不断创新,衍生出多样化的电商平台和商品推荐模式。
项目环境:java、IDEA
项目技术:Spark、Grafana
技术难度:中等
作为消费者,当我们打开某款购物APP时,随着你在平台上浏览商品并点击,计算机在后台会记录你的用户行为,并为你生成专有的?
?客群画像?
??,真正做到了??千人千面?
??、??精准推荐?
?的效果。文章图片
关于智能推荐和用户画像怎么实现,我们将在后期系列中讨论。
本项目主要对零售商品进行数据分析,通过技术手段,分析哪几款商品需求量最大(?
?购买排行top5?
??)、??热门商品?
??每日变化趋势、哪些省份是消费大省市(??消费省份分布?
??)、购买群体男女比率(??用户群体分析?
?)。项目基于?
?Spark?
??组件和??Grafana?
?工具,通过Spark数据分析,进行数据清洗、转换、计算并保存,最终Grafana进行可视化大屏展示。2 系统介绍项目程序采用Java语言编写,技术组件采用Spark和Grafana工具。
系统整体分为数据采集、数据分析、数据可视化核心部分。
1)整体技术架构
文章图片
数据采集层
通过?
?网络爬虫?
?或者下载??公开数据集?
?的技术手段(文末提供免费数据集下载)收集电商零售数据,形成结构化文本文件、数据表。- 数据分析层
基于微服务和Spark技术栈构建。微服务组件作为系统基础底座(???非必须?
??),一般公司有专门的微服务团队去做。
Spark作为数据分析组件,提供Spark内存计算、???Spark SQL?
?数据查询统计等功能,完成数据的加工、查询和结果存储。 - 数据存储层
Spark计算后的数据落地到存储介质中,向外提供数据访问能力,项目中使用mysql(redis、Hive也可)。 - 数据展示层
在展示层的技术选型方面,我选择了???Grafana?
?: 一个提供几百种数据源、多种图形样式库的可视化大屏组件,且支持SQL,比较方便。
文章图片
数据从源头的??
?结构化?
??形式(csv/table)转换为Spark的??RDD?
??形式,并最终流转到数据库中的??table表?
?形式存储。- Spark程序读取源数据(csv、table)并转成RDD(?
?csvRDD?
?) - 经过重复值、异常值、时间格式处理,形成中间RDD(?
?transRDD?
?) - Spark SQL进行数据指标计算(?
?top5xxRDD?
??),计算销售排行、用户省份分布等指标并保存到Mysql中(??tb_xxAnalysis?
?) - Grafana中进行SQL查询展示,绘制大屏。
- Spark引擎
大数据生态圈常用???计算引?
??,内存级分布式分析框架,包含Spark Core、Spark SQL、Spark Streaming、Spark MLlib和Spark Graph等模块。具体资料可以看我的相关文章,此处不再赘叙。
文章图片
- Grafana组件
Grafana是一个开源的???可视化?
??和??分析?
??平台。提供??查询?
??、??可视化?
??、??告警?
??和??监控?
??等功能。内部支持多种数据源,提供多种面板、插件来快速将复杂的数据转换为漂亮的图形和可视化的工具,可自定义告警规则。
文章图片
3 程序实现
先看下整体的架构,正如前面所说。我们在这里分成了数据采集、数据分析和数据可视化三个部分。
文章图片
3.1 系统环境
- Maven 3.5、Mysql 5.7
- jdk1.8、scala 2.12
- spark 3.0.2
- grafana 8.5
1)环境jar包依赖
这里使用了引入了spark-core和spark-sql的依赖包,且使用SparkSession方式创建SparkContext上下文。
< dependency>
< groupId> org.apache.spark< /groupId>
< artifactId> spark-core_2.12< /artifactId>
< version> 3.0.3< /version>
< /dependency>
< dependency>
< groupId> org.apache.spark< /groupId>
< artifactId> spark-sql_2.12< /artifactId>
< version> 3.0.3< /version>
< /dependency>
2)Spark脚本
在系统入口这里,为了方便本地和服务器运行?
?灵活性?
??,可支持??批量?
??任务或者??单个?
?任务方式执行,通过参数传入控制。- 包括执行的程序类名、执行标志
- 执行日期范围(不传默认全量)
// 计算kpi列表(kpi.txt)
ProductAnalysis1 2019-11-24 2019-12-22
RegionAnalysis2 2019-11-24 2019-12-22
UserAnalysis1 2019-11-24
// 服务器执行脚本
#!/usr/bin/env bash
SPARK_HOME="/usr/hdp/xxxx/spark"
SPARK_MASTER="yarn"
MAIN_CLASS="com.demo.spark.analysis.launcher.AnalysisLauncher"
SPARK_SUBMIT_OPTS="--master yarn-client --driver-memory 20g --executor-cores 8 --executor-memory 40g --num-executors 5"
...
// 执行脚本命令
sh analysis.sh kpi.txt
3)Spark启动
这里为了方便观察,统一改为Local运行模式;将执行类放入数组,使用?
?Java反射机制?
?动态执行分析子类。// 解析传递的kpi.txt中的执行类参数
// PRODUCT_CLASS_NAME、USER_CLASS_NAME
// String[] classNames = parseArgs(args);
String[] classNames = PRODUCT_CLASS_NAME, USER_CLASS_NAME, REGION_CLASS_NAME;
for (String className: classNames)
Class c = Class.forName(PACKAGE_NAME + className);
BaseHandler handler = (BaseHandler) c.getConstructors()[0].newInstance();
logger.info("数据分析开始...");
handler.execute(spark, sparkContext);
3.3 数据采集模块
使用SparkSession的?
?read()算子?
??读取csv文件,设置编码格式,并进行简单的??重复值?
??、??异常值?
??及??缺失值?
?处理;
结果保存到数据库表中。// 读取csv文件并处理
Dataset productCsvDS = spark.read()
.format("csv")
.option("delimiter", ",")
.option("encoding", "gbk")
.schema(productStructType)
.option("header", "true")
.load(ORDER_FILE_PATH)
.na()// 空值删除
.drop( new String[] "name", "price");
// 写入Mysql订单表 (解决中文乱码:设置Mysql 编码utf8)
productCsvDS.write()
.mode(SaveMode.Overwrite)
.jdbc(JDBC_URL, DB_TABLE_PRODUCT, jdbcProperties);
// 定义StructType
private static StructType getProductStructType()
StructType productStructType = DataTypes.createStructType(new StructField[]
DataTypes.createStructField("id", DataTypes.StringType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("price", DataTypes.DoubleType, false)
...
);
return productStructType;
3.4 数据分析模块
使用?
?Spark SQL?
??和??Spark内置算子?
?进行数据统计,不同场景的分析子类均需要实现execute()方法。// 子类继承抽象父类execute()方法
public abstract class BaseService
public void execute(SparkSession spark, JavaSparkContext sparkContext)
// TODO: 继承子类方法
// 1. 用户行为分析
// 2. 零售产品分析
- 用户月度购买量省份分布分析(示例)
// 注册临时表
prodCsvDS.registerTempTable("product");
orderCsvDS.registerTempTable("orderinfo");
// 列换行;分割prod_ids
String prodSplitSQL推荐阅读
- Shell编程及自动化运维流程控制(if)
- Hive跨集群和版本迁移
- 小案例--python编写设置拼手气红包模块
- 要想用活Redis,Lua脚本是绕不过去的坎 !
- 万字干货(IM 会话列表卡顿优化实践)
- Kubernetes 持久化数据存储 StorageClass
- try_files $uri $uri/ @router
- ansbile
- GitLab Jenkins CI/CD 自动化部署