#|spark 推荐系统实战
文章目录
- 问题背景
- 推荐算法:交替最小二乘推荐算法
- 矩阵分解模型
- 交替最小二乘法
- scala
问题背景 音乐推荐系统 Audioscrobbler 提供了一个公开的数据集,该数据集记录了听众播放过哪些艺术家的歌曲。last.fm 公司使用这些音乐播放记录构建了一个强大的音乐推荐引擎。这个推荐引擎系统覆盖了数百万的用户。
Audioscrobbler 数据集只记录了播放数据,如“Bob 播放了一首 Prince 的歌曲”,但没有歌曲的评分信息。这种类型的数据通常被称为隐式反馈数据,因为用户和艺术家的关系是通过其他行动隐含体现出来的,而不是通过显式的评分或点赞得到的。
数据集的下载地址为:http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May2005.tar.gz
主要的数据集在文件 user_artist_data.txt 中,它包含 141 000 个用户和 160 万个艺术家,记录了约 2420 万条用户播放艺术家歌曲的信息,其中包括播放次数信息。
推荐算法:交替最小二乘推荐算法 数据集提供的隐式反馈数据,只记录了用户和歌曲之间的交互情况。我们的学习算法不需要任何用户和艺术家的属性信息。举个例子,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这类算法通常称为协同过滤算法。
Audioscrobbler 数据集包含了数千万条某个用户播放了某个艺术家歌曲次数的信息,看起来是很大。但从另一方面来看数据集又很小而且不充足,因为数据集是稀疏的。虽然数据集覆盖 160 万个艺术家,但平均来算,每个用户只播放了大约 171 个艺术家的歌曲。有的用户只播放过一个艺术家的歌曲。对这类用户,我们也希望算法能给出像样的推荐。最后,我们希望算法的扩展性好,不但能用于构建大型模型,而且推荐速度快,通常都要求推荐是接近实时的,也就是在一秒内给出推荐。
矩阵分解模型 本实例用的是一种矩阵分解模型。数学上,把用户和产品数据当成一个大矩阵 A,其具有如下性质:
- 矩阵第 i 行和第 j 列上的元素有值,代表用户 i 播放过艺术家 j 的音乐。
- 矩阵 A 是稀疏的:A 中大多数元素都是 0,因为相对于所有可能的用户 - 艺术家组合,只有很少一部分组合会出现在数据中。
文章图片
由于k k k 的值小,矩阵分解算法只能是某种近似,如图所示 。
矩阵分解算法有时称为补全( 矩阵分解算法有时称为补全算法,因为原始矩阵A A A 可能 非常稀疏,但乘积X Y ? XY^\top XY?是稠密的,即使该矩阵存在非零元素数量也常少。因此模型只是对 因此模型只是对 A 的一种近似。
原始A A A 中大量元素是缺失的(元素值为 0),算法为这些缺失元素生成(补全)了一个值 。
两个矩阵分别有 一行对应每用户和艺术家。每行的值很少,只 k k k个 。每个值代表了对应模型的一个隐含特征。因此矩阵表示了用户和艺术家怎样关联到这些隐含特征,而隐含特征可能就对应偏好或类别。于是问题简化为用户特征矩阵 X 和艺术家矩阵 Y 的乘积,该结果是对整个稠密用户-艺术家相互关系矩阵的完整估计。
X Y ? XY^\top XY? 应该尽可能逼近 A,毕竟这是所有后续工作的基础,但它不能也应该完全复制A A A。然而想直接时得到 X X X和 Y Y Y的最优解是不可能的。如果Y Y Y 已知,求X X X 的最优解是非常容易,反之亦然。但X X X 和Y Y Y 事先都是未知的。
接下来介绍如何求解 X X X和 Y Y Y,就要用到交替最小二乘算法。
交替最小二乘法 虽然Y Y Y 是未知的,但我们可以把它初始化为随机行向量矩阵。
接着运用简单线性代数知识, 就能在给定A A A 和Y Y Y 的条件下求出 X X X的最优解。
实际上,X i X_i Xi? 是A i A_i Ai? 和Y Y Y 的函数:
A i = X i Y ? A_i = X_i Y^\top Ai?=Xi?Y?
因此可以很容易分开计算X X X 的每一行。所以我们可将其并行化:
A i Y ( Y ? Y ) ? 1 = X i A_iY(Y^\top Y)^{-1} = X_i Ai?Y(Y?Y)?1=Xi?
这里给出方程式只是为了说明行向量计算方法,实践中从来不会对矩阵求逆,我们会借助于
QR分解
之类的方法。同理,我们可以由X X X 计算每个计算每个Y j Y_j Yj?。然后又可以由Y Y Y 计算X X X,这样反复下去,就是算法名 称中“交替”的来由。
【#|spark 推荐系统实战】只要这个过程一直继续, X X X 和Y Y Y最终会收敛得到一个合适的结果。
scala
package com.cloudera.datascience.recommenderimport scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._var base = "file:///home/bigdata/"
val rawUserArtistData = https://www.it610.com/article/spark.read.textFile(base +"user_artist_data.txt")
val rawArtistData = https://www.it610.com/article/spark.read.textFile(base +"artist_data.txt")
val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt")import spark.implicits._def preparation(
rawUserArtistData: Dataset[String],
rawArtistData: Dataset[String],
rawArtistAlias: Dataset[String]): Unit = {rawUserArtistData.take(5).foreach(println)val userArtistDF = rawUserArtistData.map { line =>
val Array(user, artist, _*) = line.split(' ')
(user.toInt, artist.toInt)
}.toDF("user", "artist")userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show()val artistByID = buildArtistByID(rawArtistData)
val artistAlias = buildArtistAlias(rawArtistAlias)val (badID, goodID) = artistAlias.head
artistByID.filter($"id" isin (badID, goodID)).show()
}def model(
rawUserArtistData: Dataset[String],
rawArtistData: Dataset[String],
rawArtistAlias: Dataset[String]): Unit = {val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))val trainData = https://www.it610.com/article/buildCounts(rawUserArtistData, bArtistAlias).cache()val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(10).
setRegParam(0.01).
setAlpha(1.0).
setMaxIter(5).
setUserCol("user").
setItemCol("artist").
setRatingCol("count").
setPredictionCol("prediction").
fit(trainData)trainData.unpersist()model.userFactors.select("features").show(truncate = false)val userID = 2093760val existingArtistIDs = trainData.
filter($"user" === userID).
select("artist").as[Int].collect()val artistByID = buildArtistByID(rawArtistData)artistByID.filter($"id" isin (existingArtistIDs:_*)).show()val topRecommendations = makeRecommendations(model, userID, 5)
topRecommendations.show()val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect()artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()model.userFactors.unpersist()
model.itemFactors.unpersist()
}def evaluate(
rawUserArtistData: Dataset[String],
rawArtistAlias: Dataset[String]): Unit = {val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))val allData = https://www.it610.com/article/buildCounts(rawUserArtistData, bArtistAlias)
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()val allArtistIDs = allData.select("artist").as[Int].distinct().collect()
val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)val mostListenedAUC = areaUnderCurve(cvData, bAllArtistIDs, predictMostListened(trainData))
println(mostListenedAUC)val evaluations =
for (rank<- Seq(5,30);
regParam <- Seq(1.0, 0.0001);
alpha<- Seq(1.0, 40.0))
yield {
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(rank).setRegParam(regParam).
setAlpha(alpha).setMaxIter(20).
setUserCol("user").setItemCol("artist").
setRatingCol("count").setPredictionCol("prediction").
fit(trainData)val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform)model.userFactors.unpersist()
model.itemFactors.unpersist()(auc, (rank, regParam, alpha))
}evaluations.sorted.reverse.foreach(println)trainData.unpersist()
cvData.unpersist()
}def recommend(
rawUserArtistData: Dataset[String],
rawArtistData: Dataset[String],
rawArtistAlias: Dataset[String]): Unit = {val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))
val allData = https://www.it610.com/article/buildCounts(rawUserArtistData, bArtistAlias).cache()
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(10).setRegParam(1.0).setAlpha(40.0).setMaxIter(20).
setUserCol("user").setItemCol("artist").
setRatingCol("count").setPredictionCol("prediction").
fit(allData)
allData.unpersist()val userID = 2093760
val topRecommendations = makeRecommendations(model, userID, 5)val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect()
val artistByID = buildArtistByID(rawArtistData)
artistByID.join(spark.createDataset(recommendedArtistIDs).toDF("id"), "id").
select("name").show()model.userFactors.unpersist()
model.itemFactors.unpersist()
}def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = {
rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case _: NumberFormatException => None
}
}
}.toDF("id", "name")
}def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = {
rawArtistAlias.flatMap { line =>
val Array(artist, alias) = line.split('\t')
if (artist.isEmpty) {
None
} else {
Some((artist.toInt, alias.toInt))
}
}.collect().toMap
}def buildCounts(
rawUserArtistData: Dataset[String],
bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = {
rawUserArtistData.map { line =>
val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
(userID, finalArtistID, count)
}.toDF("user", "artist", "count")
}def makeRecommendations(model: ALSModel, userID: Int, howMany: Int): DataFrame = {
val toRecommend = model.itemFactors.
select($"id".as("artist")).
withColumn("user", lit(userID))
model.transform(toRecommend).
select("artist", "prediction").
orderBy($"prediction".desc).
limit(howMany)
}def areaUnderCurve(
positiveData: DataFrame,
bAllArtistIDs: Broadcast[Array[Int]],
predictFunction: (DataFrame => DataFrame)): Double = {// What this actually computes is AUC, per user. The result is actually something
// that might be called "mean AUC".// Take held-out data as the "positive".
// Make predictions for each of them, including a numeric score
val positivePredictions = predictFunction(positiveData.select("user", "artist")).
withColumnRenamed("prediction", "positivePrediction")// BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
// small AUC problems, and it would be inefficient, when a direct computation is available.// Create a set of "negative" products for each user. These are randomly chosen
// from among all of the other artists, excluding those that are "positive" for the user.
val negativeData = https://www.it610.com/article/positiveData.select("user", "artist").as[(Int,Int)].
groupByKey { case (user, _) => user }.
flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
val random = new Random()
val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
val negative = new ArrayBuffer[Int]()
val allArtistIDs = bAllArtistIDs.value
var i = 0
// Make at most one pass over all artists to avoid an infinite loop.
// Also stop when number of negative equals positive set size
while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
// Only add new distinct IDs
if (!posItemIDSet.contains(artistID)) {
negative += artistID
}
i += 1
}
// Return the set with user ID added back
negative.map(artistID => (userID, artistID))
}.toDF("user", "artist")// Make predictions on the rest:
val negativePredictions = predictFunction(negativeData).
withColumnRenamed("prediction", "negativePrediction")// Join positive predictions to negative predictions by user, only.
// This will result in a row for every possible pairing of positive and negative
// predictions within each user.
val joinedPredictions = positivePredictions.join(negativePredictions, "user").
select("user", "positivePrediction", "negativePrediction").cache()// Count the number of pairs per user
val allCounts = joinedPredictions.
groupBy("user").agg(count(lit("1")).as("total")).
select("user", "total")
// Count the number of correctly ordered pairs per user
val correctCounts = joinedPredictions.
filter($"positivePrediction" > $"negativePrediction").
groupBy("user").agg(count("user").as("correct")).
select("user", "correct")// Combine these, compute their ratio, and average over all users
val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
agg(mean("auc")).
as[Double].first()joinedPredictions.unpersist()meanAUC
}def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = {
val listenCounts = train.groupBy("artist").
agg(sum("count").as("prediction")).
select("artist", "prediction")
allData.
join(listenCounts, Seq("artist"), "left_outer").
select("user", "artist", "prediction")
}preparation(rawUserArtistData, rawArtistData, rawArtistAlias)
model(rawUserArtistData, rawArtistData, rawArtistAlias)
evaluate(rawUserArtistData, rawArtistAlias)
recommend(rawUserArtistData, rawArtistData
推荐阅读
- 成交的种子咖啡冥想
- 历史教学书籍
- 如何在Mac中的文件选择框中打开系统隐藏文件夹
- 单点登陆
- 操作系统|[译]从内部了解现代浏览器(1)
- 游乐园系统,助力游乐园管理
- 【亲测好用】高逼格配色网站推荐
- 中国MES系统软件随工业化成长
- 推荐系统论文进阶|CTR预估 论文精读(十一)--Deep Interest Evolution Network(DIEN)
- Spark|Spark 数据倾斜及其解决方案