基于用户SparkALS推荐系统

①基于用户SparkALS数据源
数据源来自数仓平台,shopcar表记录登录用户购物车及商品信息;collect表记录登录用户收藏记录,收藏类型(1商品 2品牌);browse表记录登录用户浏览记录,收藏类型(1 商品 2品牌);订单状态来自主订单表和产品纬度表;浏览和深度浏览来自hbase的growingio_custom_event表
制定用户评分打分规则
浏览1分
深度浏览2分(浏览时间超过30s)
浏览次数1分(超过2次2分)
收藏2分
加购物车3分
下单4分
支付5分

浏览,深度浏览和浏览次数的权重为0.7;收藏,加购物车,下单和支付的权重是0.3。
核心代码如下:

-- 商品详情页面停留时间打分表,数据来源是hbase,此处作映射表拿到数据 INSERT OVERWRITE TABLE bi_dw.growingio_custom_event_staytime SELECT t.product_id, t.loginuserid, t.stay_time, CASE WHEN t.stay_time is NULL THEN 0 WHEN t.stay_time=0 THEN 0 WHEN t.stay_time>0 AND t.stay_time<30 THEN 1 ELSE 3 END AS rating FROM (SELECT t.product_id, t.loginuserid, CASE WHEN stay_time1 is NULL OR stay_time1='' THEN stay_time2 ELSE stay_time1 END stay_time FROM (SELECT regexp_extract(regexp_extract(t.c11,'(\\"+[0-9]+\\")',0),'([0-9]+)',0)AS product_id, regexp_extract(t.c12,'([0-9]+)',0)AS loginuserid, regexp_extract(regexp_extract(t.c11,'\\"+stay_time+\\"+(\\:+\\"+[0-9]+\\")',0),'([0-9]+)',0)AS stay_time1, regexp_extract(regexp_extract(t.c11,'\\"+stay_time+\\"+(\\:+[0-9]+\\.+[0-9]+)',0),'([0-9]+\\.+[0-9])',0)AS stay_time2 FROM bi_dw.growingio_custom_event t WHERE t.c9='ProductDetailUserStayTime')t )t ; -- 用户评分表 set hive.exec.parallel=true; INSERT OVERWRITE TABLE bi_ads.user_rating SELECT t.useridASuser_id,-- 用户id t.objectidASproduct_id,-- 商品id ((t.rating1 + t.rating2)/2*0.7 + (t.rating3 + t.rating4 + t.rating5)/3*0.3) AS rating,-- 总评分,0.7和0.3是权重 t.time-- 时间 FROM (SELECT t1.userid, t1.objectid, t1.addtimeAStime, t1.ratingASrating1,-- 浏览次数评分 t1.view_num, t2.stay_time, t3.product_idAS collction, t6.add_shopcar_num, CASE WHEN t2.rating is NULL THEN 0 ELSE t2.rating ENDASrating2,-- 商品详情页面停留时长即浏览时间评分 CASE WHEN t3.rating is NULL THEN 0 ELSE t3.rating ENDASrating3,-- 收藏评分 CASE WHEN t6.rating4 is NULL THEN 0 ELSE t6.rating4 ENDASrating4,-- 加入购物车评分 CASE WHEN t8.rating is NULL THEN 0 ELSE t8.rating ENDASrating5-- 订单状态评分 FROM bi_dw.dw_browse t1 LEFT JOIN -- 商品详情页面停留时长,数据来自hbase中的growingio_custom_event,在dw层做了映射外部表growingio_custom_event_staytime (SELECT t.loginuseridAS user_id, t.product_id, t.stay_time, t.rating FROM ( SELECT loginuserid, product_id, stay_time, rating, ROW_NUMBER() OVER(PARTITION BY loginuserid,product_id ORDER BY cast (stay_time as int) DESC) AS R1 FROM bi_dw.growingio_custom_event_staytime )t WHERE t.R1=1 AND t.loginuserid is NOT NULL)t2 ONt1.userid=t2.user_id AND t1.objectid=t2.product_id LEFT JOIN -- 收藏评分 (SELECT t3.useridASuser_id, t3.objectidASproduct_id, t3.rating FROM bi_dw.dw_collect t3 )t3 ONt1.userid=t3.user_id AND t1.objectid=t3.product_id -- 加入购物车评分 LEFT JOIN (SELECT t5.user_id, t5.product_id, SUM(num)AS add_shopcar_num, SUM(rating)AS rating4 FROM(SELECT t4.useridAS user_id, t5.product_idAS product_id, t4.sku_id, t4.num, t4.rating FROM bi_dw.dw_shopcar t4 JOIN(SELECT product_id, idAS sku_id FROM bi_dw.dw_product_sku )t5ON t4.sku_id=t5.sku_id)t5 GROUP BY t5.user_id,t5.product_id)t6ONt1.userid=t6.user_id AND t1.objectid=t6.product_id LEFT JOIN -- 订单状态总评分 (SELECT t8.user_id, t8.product_id, SUM(t8.rating)AS rating FROM (SELECT t7.encoding, t7.user_id, t8.product_id, t8.sku_id, t7.payment_state, CASE WHEN t7.payment_state='unPaid' THEN 4 WHEN t7.payment_state='partialPaid' THEN 4 WHEN t7.payment_state='fullPaid' THEN 9 ELSE 0 END rating FROM bi_dws.fact_main_order t7 JOIN (SELECT encoding, user_id, product_id, sku_id FROM bi_dws.fact_order_product WHERE product_id is not NULL AND sku_id is not NULL)t8 ON t7.encoding=t8.encoding AND t7.user_id=t8.user_id WHERE t7.user_id is not NULL)t8 GROUP BY t8.user_id, t8.product_id )t8ONt1.userid=t8.user_id AND t1.objectid=t8.product_id GROUP BYt1.userid, t1.objectid, t1.addtime, t1.rating, t1.view_num, t2.stay_time, t3.product_id, t6.add_shopcar_num, CASE WHEN t2.rating is NULL THEN 0 ELSE t2.rating END, CASE WHEN t3.rating is NULL THEN 0 ELSE t3.rating END, CASE WHEN t6.rating4 is NULL THEN 0 ELSE t6.rating4 END, CASE WHEN t8.rating is NULL THEN 0 ELSE t8.rating END )t ;

②、基于用户的SparkALS推荐算法
(1) 欧式距离代码:
package com.wozaijia.sparkALS.UserCF; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import scala.Tuple2; public class SparkALSRMSE { /** * RMSE:均方根误差 * 根据模型mode计算data的平均均方根误差 */ public static double computeRMSEAverage(MatrixFactorizationModel model, JavaRDD data, long n) { //map(d -> new Tuple2<>(d.user(),d.product())); 将predictionData预测数据转换成二元组形式,以便训练使用 //通过模型对数据进行预测 JavaRDD jddRat = model.predict(JavaPairRDD.fromJavaRDD(data.map(d -> new Tuple2<>(d.user(), d.product())))); JavaPairRDD pre = JavaPairRDD.fromJavaRDD(jddRat.map(f -> new Tuple2<>(f.user() + "_" + f.product(), f.rating()))); //原值 JavaPairRDD rea = JavaPairRDD.fromJavaRDD(data.map(f -> new Tuple2<>(f.user() + "_" + f.product(), f.rating()))); //预测值和原值内连接,相当于SQL中的内联 JavaRDD> d = pre.join(rea).values(); //计算方差并返回计算结果 return d.map(f -> Math.pow(f._1 - f._2, 2)).reduce((a, b) -> a + b) / n; } }

(2) 训练数据部分代码,全部代码请下载SparkALS项目:https://download.csdn.net/download/qq_37116817/10808543
/** * 基于用户的SparkALS推荐系统 * 训练 */ public class SparkALSTrain { //初始化模型的参数 public static MatrixFactorizationModel bestModel = null; public static JavaRDD candidates = null; public static void main(String[] args) { //设置日志的等级 并关闭jetty容器的日志; Logger logger = Logger.getLogger(SparkALSTrain.class); Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF); //集群模式,默认是集群模式 JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkALS_User")); //JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkALS_User").setMaster("local[4]")); //本地模式 /** * 加载样本数据 * ratings :评分矩阵对应的RDD。需要我们输入。如果是隐式反馈,则是评分矩阵对应的隐式反馈矩阵。 * 所有评分数据,由于此数据要分三部分使用[比例是6:2:2],60%用于训练,20%用于验证,最后20%用于测试,将时间戳 %10 可以得到近似的10等分,用于三部分数据切分 */ String productDir = "hdfs地址"; JavaRDD> ratings = sc.textFile(productDir).map( //-> {} JDK8新特性,lambda表达式,就是匿名函数,匿名内部类等。 line -> { //以"; "分割 String[] fields = line.split("; "); if (fields.length != 4) { throw new IllegalArgumentException("每一行必须有且只有4个元素"); } //以模10拆分数据,形成新的二元组; return new Tuple2<>(Long.parseLong(fields[3]) % 10, new Rating( Integer.parseInt(fields[0]),//用户id Integer.parseInt(fields[1]),//商品id Double.parseDouble(fields[2]))); //评分 } ); logger.info("查看第一条记录" + ratings.first()); logger.info("SparkALS 算法运行过程模型Start………………"); // 统计样本数据中的评分概要 //用户 JavaRDD user = ratings.map(x -> x._2.user()).distinct(); //商品 JavaRDD product = ratings.map(y -> y._2.product()).distinct(); logger.info("总共有" + ratings.count() + "条样本数据; " + user.count() + "用户; " + product.count() + "商品"); //装载评分数据,由评分器生成,数据格式为''用于,商品ID,评分''; String path = "hdfs地址"; JavaRDD data = https://www.it610.com/article/sc.textFile(path); JavaRDD myRatingsRDD = data.map(s -> { String[] sarray = s.split("; "); return new Rating( Integer.parseInt(sarray[0]),//用户id Integer.parseInt(sarray[1]),//商品id Double.parseDouble(sarray[2])); //评分 }); /** * 训练 * filter过滤; union联合 转换操作,它将2个RDD中的每个元素都联合起来,生成1个新的RDD,其效果与SQL中的union相似 */ //设置分区数 int numPartition = 4; //用于训练是rating中key=[0-5]的数据,将用户的数据合并到了训练集中 JavaRDD training = ratings.filter(x -> x._1 < 6).map(tupe2 -> tupe2._2).union(myRatingsRDD).repartition(numPartition).cache(); //JavaRDD training = ratings.filter(x -> x._1 < 6).map(tupe2 -> tupe2._2).union(myRatingsRDD).repartition(numPartition).persist(StorageLevel.MEMORY_ONLY()); // 用于校验是rating中key=[6-7]的数据 JavaRDD validation = ratings.filter(x -> x._1 >= 6 && x._1 < 8).map(tupe2 -> tupe2._2).repartition(numPartition).cache(); // 用于测试的是rating中key=[8-9]的数据 JavaRDD test = ratings.filter(x -> x._1 >= 8).map(tupe2 -> tupe2._2).repartition(numPartition).cache(); logger.info("训练数据:" + training.count() + "; 验证数据" + validation.count() + "; 测试数据" + test.count()); /** * 定义不同的参数。计算均方根误差值,找到均方根误差值最小的模型。即:最优模型 * rank : 矩阵分解时对应的低维的维数,即PTm×kQk×nPm×kTQk×n中的维度k,特征向量纬度,太小拟合不够,误差就很大;如果设置很大,就会导致模型大泛化能力较差,算法运行的时间和占用的内存可能会越多。通常需要进行调参,一般取10-100之间的数。 * iterations :在矩阵分解用交替最小二乘法求解时,进行迭代的最大次数(这个设置的越大肯定是越精确,但是设置的越大也就意味着越耗时)。这个值取决于评分矩阵的维度,以及评分矩阵的系数程度。一般来说5-20次即可,默认值是5。 * lambda: 这个值为FunkSVD分解时对应的正则化系数,主要用于控制模型的拟合程度,增强模型泛化能力,取值越大,则正则化惩罚越,一般0-100调参。 */ List ranks = Arrays.asList(8, 10, 12); //特征向量纬度系数 List lambdas = Arrays.asList(2.5, 0.1, 5.1); //模型拟合系数 List numIters = Arrays.asList(10, 15, 20); //矩阵分解用交替最小二乘法求解时,迭代次数double bestValidationRmse = Double.MAX_VALUE; int bestRank = 0; double bestLambda = -1.0; int bestNumIter = -1; /** * 三层for循环,计算均方根误差值,找到均方根差值最小的模型,即最优模型 */ for (int i = 0; i < ranks.size(); i++) { for (int j = 0; j < lambdas.size(); j++) { for (int k = 0; k < numIters.size(); k++) { //训练获得模型 MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(training), ranks.get(i), numIters.get(k), lambdas.get(j)); //通过校验集validation获取方差,以便查看此模型的好坏,方法方法定义在最下面 double validationRmse = SparkALSRMSE.computeRMSEAverage(model, validation, validation.count()); //logger.info("RMSE均方差误差验证=====>" + validationRmse); if (validationRmse < bestValidationRmse) { bestModel = model; bestValidationRmse = validationRmse; bestRank = ranks.get(i); bestLambda = lambdas.get(j); bestNumIter = numIters.get(k); } } } } //用得到的最好模型参数去验证test数据集,并求方差值 double testRmse = SparkALSRMSE.computeRMSEAverage(bestModel, test, test.count()); logger.info("用户特征向量总和=" + bestModel.userFeatures().count() + "; 商品特征向量总和=" + bestModel.productFeatures().count()); logger.info("最优模型的特征向量纬度 = " + bestRank + "; 模型拟合系数 = " + bestLambda + "; 迭代次数 = " + bestNumIter + "; 均方差误差 = " + testRmse + "."); } }


③全部代码请下载SparkALS项目:https://download.csdn.net/download/qq_37116817/10808543
项目中的代码都有详细的注释,非常容易看懂,并且项目中包含测试数据100万条。
【基于用户SparkALS推荐系统】

    推荐阅读