#|spark 推荐系统实战


文章目录

  • 问题背景
  • 推荐算法:交替最小二乘推荐算法
    • 矩阵分解模型
    • 交替最小二乘法
  • scala

问题背景 音乐推荐系统 Audioscrobbler 提供了一个公开的数据集,该数据集记录了听众播放过哪些艺术家的歌曲。last.fm 公司使用这些音乐播放记录构建了一个强大的音乐推荐引擎。这个推荐引擎系统覆盖了数百万的用户。
Audioscrobbler 数据集只记录了播放数据,如“Bob 播放了一首 Prince 的歌曲”,但没有歌曲的评分信息。这种类型的数据通常被称为隐式反馈数据,因为用户和艺术家的关系是通过其他行动隐含体现出来的,而不是通过显式的评分或点赞得到的。
数据集的下载地址为:http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May2005.tar.gz
主要的数据集在文件 user_artist_data.txt 中,它包含 141 000 个用户和 160 万个艺术家,记录了约 2420 万条用户播放艺术家歌曲的信息,其中包括播放次数信息。
推荐算法:交替最小二乘推荐算法 数据集提供的隐式反馈数据,只记录了用户和歌曲之间的交互情况。我们的学习算法不需要任何用户和艺术家的属性信息。举个例子,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这类算法通常称为协同过滤算法。
Audioscrobbler 数据集包含了数千万条某个用户播放了某个艺术家歌曲次数的信息,看起来是很大。但从另一方面来看数据集又很小而且不充足,因为数据集是稀疏的。虽然数据集覆盖 160 万个艺术家,但平均来算,每个用户只播放了大约 171 个艺术家的歌曲。有的用户只播放过一个艺术家的歌曲。对这类用户,我们也希望算法能给出像样的推荐。最后,我们希望算法的扩展性好,不但能用于构建大型模型,而且推荐速度快,通常都要求推荐是接近实时的,也就是在一秒内给出推荐。
矩阵分解模型 本实例用的是一种矩阵分解模型。数学上,把用户和产品数据当成一个大矩阵 A,其具有如下性质:
  1. 矩阵第 i 行和第 j 列上的元素有值,代表用户 i 播放过艺术家 j 的音乐。
  2. 矩阵 A 是稀疏的:A 中大多数元素都是 0,因为相对于所有可能的用户 - 艺术家组合,只有很少一部分组合会出现在数据中。
算法将A A A 分解为两个小矩阵X X X 和Y Y Y 的乘积。矩阵X X X 和矩阵Y Y Y 非常细长,因为A A A 有很多行和列。 X X X 和Y ? Y^\top Y? 的行很多而列很少,列数用 k 表示,这 k 个列用于解释数据中的交互关系。
#|spark 推荐系统实战
文章图片

由于k k k 的值小,矩阵分解算法只能是某种近似,如图所示 。
矩阵分解算法有时称为补全( 矩阵分解算法有时称为补全算法,因为原始矩阵A A A 可能 非常稀疏,但乘积X Y ? XY^\top XY?是稠密的,即使该矩阵存在非零元素数量也常少。因此模型只是对 因此模型只是对 A 的一种近似。
原始A A A 中大量元素是缺失的(元素值为 0),算法为这些缺失元素生成(补全)了一个值 。
两个矩阵分别有 一行对应每用户和艺术家。每行的值很少,只 k k k个 。每个值代表了对应模型的一个隐含特征。因此矩阵表示了用户和艺术家怎样关联到这些隐含特征,而隐含特征可能就对应偏好或类别。于是问题简化为用户特征矩阵 X 和艺术家矩阵 Y 的乘积,该结果是对整个稠密用户-艺术家相互关系矩阵的完整估计。
X Y ? XY^\top XY? 应该尽可能逼近 A,毕竟这是所有后续工作的基础,但它不能也应该完全复制A A A。然而想直接时得到 X X X和 Y Y Y的最优解是不可能的。如果Y Y Y 已知,求X X X 的最优解是非常容易,反之亦然。但X X X 和Y Y Y 事先都是未知的。
接下来介绍如何求解 X X X和 Y Y Y,就要用到交替最小二乘算法。
交替最小二乘法 虽然Y Y Y 是未知的,但我们可以把它初始化为随机行向量矩阵。
接着运用简单线性代数知识, 就能在给定A A A 和Y Y Y 的条件下求出 X X X的最优解。
实际上,X i X_i Xi? 是A i A_i Ai? 和Y Y Y 的函数:
A i = X i Y ? A_i = X_i Y^\top Ai?=Xi?Y?
因此可以很容易分开计算X X X 的每一行。所以我们可将其并行化:
A i Y ( Y ? Y ) ? 1 = X i A_iY(Y^\top Y)^{-1} = X_i Ai?Y(Y?Y)?1=Xi?
这里给出方程式只是为了说明行向量计算方法,实践中从来不会对矩阵求逆,我们会借助于 QR分解 之类的方法。
同理,我们可以由X X X 计算每个计算每个Y j Y_j Yj?。然后又可以由Y Y Y 计算X X X,这样反复下去,就是算法名 称中“交替”的来由。
【#|spark 推荐系统实战】只要这个过程一直继续, X X X 和Y Y Y最终会收敛得到一个合适的结果。
scala
package com.cloudera.datascience.recommenderimport scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._var base = "file:///home/bigdata/" val rawUserArtistData = https://www.it610.com/article/spark.read.textFile(base +"user_artist_data.txt") val rawArtistData = https://www.it610.com/article/spark.read.textFile(base +"artist_data.txt") val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt")import spark.implicits._def preparation( rawUserArtistData: Dataset[String], rawArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = {rawUserArtistData.take(5).foreach(println)val userArtistDF = rawUserArtistData.map { line => val Array(user, artist, _*) = line.split(' ') (user.toInt, artist.toInt) }.toDF("user", "artist")userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show()val artistByID = buildArtistByID(rawArtistData) val artistAlias = buildArtistAlias(rawArtistAlias)val (badID, goodID) = artistAlias.head artistByID.filter($"id" isin (badID, goodID)).show() }def model( rawUserArtistData: Dataset[String], rawArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = {val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))val trainData = https://www.it610.com/article/buildCounts(rawUserArtistData, bArtistAlias).cache()val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(10). setRegParam(0.01). setAlpha(1.0). setMaxIter(5). setUserCol("user"). setItemCol("artist"). setRatingCol("count"). setPredictionCol("prediction"). fit(trainData)trainData.unpersist()model.userFactors.select("features").show(truncate = false)val userID = 2093760val existingArtistIDs = trainData. filter($"user" === userID). select("artist").as[Int].collect()val artistByID = buildArtistByID(rawArtistData)artistByID.filter($"id" isin (existingArtistIDs:_*)).show()val topRecommendations = makeRecommendations(model, userID, 5) topRecommendations.show()val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect()artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()model.userFactors.unpersist() model.itemFactors.unpersist() }def evaluate( rawUserArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = {val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))val allData = https://www.it610.com/article/buildCounts(rawUserArtistData, bArtistAlias) val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) trainData.cache() cvData.cache()val allArtistIDs = allData.select("artist").as[Int].distinct().collect() val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)val mostListenedAUC = areaUnderCurve(cvData, bAllArtistIDs, predictMostListened(trainData)) println(mostListenedAUC)val evaluations = for (rank<- Seq(5,30); regParam <- Seq(1.0, 0.0001); alpha<- Seq(1.0, 40.0)) yield { val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(rank).setRegParam(regParam). setAlpha(alpha).setMaxIter(20). setUserCol("user").setItemCol("artist"). setRatingCol("count").setPredictionCol("prediction"). fit(trainData)val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform)model.userFactors.unpersist() model.itemFactors.unpersist()(auc, (rank, regParam, alpha)) }evaluations.sorted.reverse.foreach(println)trainData.unpersist() cvData.unpersist() }def recommend( rawUserArtistData: Dataset[String], rawArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = {val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias)) val allData = https://www.it610.com/article/buildCounts(rawUserArtistData, bArtistAlias).cache() val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(10).setRegParam(1.0).setAlpha(40.0).setMaxIter(20). setUserCol("user").setItemCol("artist"). setRatingCol("count").setPredictionCol("prediction"). fit(allData) allData.unpersist()val userID = 2093760 val topRecommendations = makeRecommendations(model, userID, 5)val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect() val artistByID = buildArtistByID(rawArtistData) artistByID.join(spark.createDataset(recommendedArtistIDs).toDF("id"), "id"). select("name").show()model.userFactors.unpersist() model.itemFactors.unpersist() }def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = { rawArtistData.flatMap { line => val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None } else { try { Some((id.toInt, name.trim)) } catch { case _: NumberFormatException => None } } }.toDF("id", "name") }def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = { rawArtistAlias.flatMap { line => val Array(artist, alias) = line.split('\t') if (artist.isEmpty) { None } else { Some((artist.toInt, alias.toInt)) } }.collect().toMap }def buildCounts( rawUserArtistData: Dataset[String], bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = { rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) (userID, finalArtistID, count) }.toDF("user", "artist", "count") }def makeRecommendations(model: ALSModel, userID: Int, howMany: Int): DataFrame = { val toRecommend = model.itemFactors. select($"id".as("artist")). withColumn("user", lit(userID)) model.transform(toRecommend). select("artist", "prediction"). orderBy($"prediction".desc). limit(howMany) }def areaUnderCurve( positiveData: DataFrame, bAllArtistIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = {// What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC".// Take held-out data as the "positive". // Make predictions for each of them, including a numeric score val positivePredictions = predictFunction(positiveData.select("user", "artist")). withColumnRenamed("prediction", "positivePrediction")// BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available.// Create a set of "negative" products for each user. These are randomly chosen // from among all of the other artists, excluding those that are "positive" for the user. val negativeData = https://www.it610.com/article/positiveData.select("user", "artist").as[(Int,Int)]. groupByKey { case (user, _) => user }. flatMapGroups { case (userID, userIDAndPosArtistIDs) => val random = new Random() val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet val negative = new ArrayBuffer[Int]() val allArtistIDs = bAllArtistIDs.value var i = 0 // Make at most one pass over all artists to avoid an infinite loop. // Also stop when number of negative equals positive set size while (i < allArtistIDs.length && negative.size < posItemIDSet.size) { val artistID = allArtistIDs(random.nextInt(allArtistIDs.length)) // Only add new distinct IDs if (!posItemIDSet.contains(artistID)) { negative += artistID } i += 1 } // Return the set with user ID added back negative.map(artistID => (userID, artistID)) }.toDF("user", "artist")// Make predictions on the rest: val negativePredictions = predictFunction(negativeData). withColumnRenamed("prediction", "negativePrediction")// Join positive predictions to negative predictions by user, only. // This will result in a row for every possible pairing of positive and negative // predictions within each user. val joinedPredictions = positivePredictions.join(negativePredictions, "user"). select("user", "positivePrediction", "negativePrediction").cache()// Count the number of pairs per user val allCounts = joinedPredictions. groupBy("user").agg(count(lit("1")).as("total")). select("user", "total") // Count the number of correctly ordered pairs per user val correctCounts = joinedPredictions. filter($"positivePrediction" > $"negativePrediction"). groupBy("user").agg(count("user").as("correct")). select("user", "correct")// Combine these, compute their ratio, and average over all users val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer"). select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")). agg(mean("auc")). as[Double].first()joinedPredictions.unpersist()meanAUC }def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = { val listenCounts = train.groupBy("artist"). agg(sum("count").as("prediction")). select("artist", "prediction") allData. join(listenCounts, Seq("artist"), "left_outer"). select("user", "artist", "prediction") }preparation(rawUserArtistData, rawArtistData, rawArtistAlias) model(rawUserArtistData, rawArtistData, rawArtistAlias) evaluate(rawUserArtistData, rawArtistAlias) recommend(rawUserArtistData, rawArtistData

    推荐阅读