数据清洗加载到mongodb
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Product数据集
* 3982 商品ID
* Fuhlen 富勒 M8眩光舞者时尚节能 商品名称
* 1057,439,736 商品分类ID,不需要
* B009EJN4T2 亚马逊ID,不需要
* https://images-cn-4.ssl-image 商品的图片URL
* 外设产品|鼠标|电脑/办公 商品分类
* 富勒|鼠标|电子产品|好用|外观漂亮 商品UGC标签
*/
case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)
/**
* Rating数据集
* 4867 用户ID
* 457976 商品ID
* 5.0 评分
* 1395676800 时间戳
*/
case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
/**
* MongoDB连接配置
*
* @param uri MongoDB的连接uri
*
* @param db 要操作的db
*/
case class MongoConfig(uri: String, db: String)
object DataLoader {
// 定义数据文件路径
val PRODUCT_DATA_PATH = "D:\\Projects\\BigData\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\products.csv"
val RATING_DATA_PATH = "D:\\Projects\\BigData\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"
// 定义mongodb中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val MONGODB_RATING_COLLECTION = "Rating"
import spark.implicits._
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 加载数据
val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
val productDF = productRDD.map(item => {
// product数据通过^分隔,切分出来
val attr = item.split("\\^")
// 转换成Product
Product(attr(0).toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim)
}).toDF()
val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
val ratingDF = ratingRDD.map(item => {
val attr = item.split(",")
Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
}).toDF()
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
storeDataInMongoDB(productDF, ratingDF)
spark.stop()
}
def storeDataInMongoDB(productDF: DataFrame, ratingDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit = {
// 新建一个mongodb的连接,客户端
val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
// 定义要操作的mongodb表,可以理解为 db.Product
val productCollection = mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION)
val ratingCollection = mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
// 如果表已经存在,则删掉
productCollection.dropCollection()
ratingCollection.dropCollection()
// 将当前数据存入对应的表中
productDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 对表创建索引
productCollection.createIndex(MongoDBObject("productId" -> 1))
ratingCollection.createIndex(MongoDBObject("productId" -> 1))
ratingCollection.createIndex(MongoDBObject("userId" -> 1))
mongoClient.close()
}
}
离线统计推荐
/**
1. 历史热门商品,按照评分个数统计,productId,count
2. 近期热门商品,把时间戳转换成yyyyMM格式进行评分个数统计,最终得到productId, count, yearmonth
3. 优质商品统计,商品的平均评分,productId,avg
*/
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
object StatisticsRecommender {
// 定义mongodb中存储的表名
val MONGODB_RATING_COLLECTION = "Rating"
val RATE_MORE_PRODUCTS = "RateMoreProducts"
val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"
val AVERAGE_PRODUCTS = "AverageProducts"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[1]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加载数据
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Rating]
.toDF()
// 创建一张叫ratings的临时表
ratingDF.createOrReplaceTempView("ratings")
// TODO: 用spark sql去做不同的统计推荐
// 1. 历史热门商品,按照评分个数统计,productId,count
val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId order by count desc")
storeDFInMongoDB(rateMoreProductsDF, RATE_MORE_PRODUCTS)
// 2. 近期热门商品,把时间戳转换成yyyyMM格式进行评分个数统计,最终得到productId, count, yearmonth
// 创建一个日期格式化工具
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
// 注册UDF,将timestamp转化为年月格式yyyyMM
spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
// 把原始rating数据转换成想要的结构productId, score, yearmonth
val ratingOfYearMonthDF = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
ratingOfYearMonthDF.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyProductsDF = spark.sql("select productId, count(productId) as count, yearmonth from ratingOfMonth group by yearmonth, productId order by yearmonth desc, count desc")
// 把df保存到mongodb
storeDFInMongoDB(rateMoreRecentlyProductsDF, RATE_MORE_RECENTLY_PRODUCTS)
// 3. 优质商品统计,商品的平均评分,productId,avg
val averageProductsDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")
storeDFInMongoDB(averageProductsDF, AVERAGE_PRODUCTS)
spark.stop()
}
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
}
基于LMF(隐语模型)的协同过滤推荐
//采用ALS作为协同过滤算法,根据MongoDB中的用户分表计算离线的用户商品推荐列表以及商品相似度矩阵
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)
// 定义用户的推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])
// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OfflineRecommender {
// 定义mongodb中存储的表名
val MONGODB_RATING_COLLECTION = "Rating"
val USER_RECS = "UserRecs"
val PRODUCT_RECS = "ProductRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加载数据
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(
rating => (rating.userId, rating.productId, rating.score)
).cache()
// 提取出所有用户和商品的数据集
val userRDD = ratingRDD.map(_._1).distinct()
val productRDD = ratingRDD.map(_._2).distinct()
// 核心计算过程
// 1. 训练隐语义模型
val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
// 定义模型训练的参数,rank隐特征个数,iterations迭代词数,lambda正则化系数
val (rank, iterations, lambda) = (5, 10, 0.01)
val model = ALS.train(trainData, rank, iterations, lambda)
// 2. 获得预测评分矩阵,得到用户的推荐列表
// 用userRDD和productRDD做一个笛卡尔积,得到空的userProductsRDD表示的评分矩阵
val userProducts = userRDD.cartesian(productRDD)
val preRating = model.predict(userProducts)
// 从预测评分矩阵中提取得到用户推荐列表
val userRecs = preRating.filter(_.rating > 0)
.map(
rating => (rating.user, (rating.product, rating.rating))
)
.groupByKey()
.map {
case (userId, recs) => //降序排序
UserRecs(userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
}
.toDF()
userRecs.write
.option("uri", mongoConfig.uri)
.option("collection", USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 3. 利用商品的特征向量,计算商品的相似度列表
val productFeatures = model.productFeatures.map {
case (productId, features) => (productId, new DoubleMatrix(features))
}
// 两两配对商品,计算余弦相似度
val productRecs = productFeatures.cartesian(productFeatures)
.filter {
case (a, b) => a._1 != b._1
}
// 计算余弦相似度
.map {
case (a, b) =>
val simScore = consinSim(a._2, b._2)
(a._1, (b._1, simScore))
}
.filter(_._2._2 > 0.4)
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
}
.toDF()
productRecs.write
.option("uri", mongoConfig.uri)
.option("collection", PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
product1.dot(product2) / (product1.norm2() * product2.norm2())
}
}
ALS模型评估和参数选择
//val (rank, iterations, lambda) = (5, 10, 0.01)
import breeze.numerics.sqrt
import com.atguigu.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object ALSTrainer {
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加载数据
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(
rating => Rating(rating.userId, rating.productId, rating.score)
).cache()
// 数据集切分成训练集和测试集
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = splits(0)
val testingRDD = splits(1)
// 核心实现:输出最优参数
adjustALSParams(trainingRDD, testingRDD)
spark.stop()
}
def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating]): Unit = {
// 遍历数组中定义的参数取值
val result = for (rank <- Array(5, 10, 20, 50); lambda <- Array(1, 0.1, 0.01))
yield {
val model = ALS.train(trainData, rank, 10, lambda)
val rmse = getRMSE(model, testData)
(rank, lambda, rmse)
}
// 按照rmse排序并输出最优参数
println(result.minBy(_._3))
}
def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
// 构建userProducts,得到预测评分矩阵
val userProducts = data.map(item => (item.user, item.product))
val predictRating = model.predict(userProducts)
// 按照公式计算rmse,首先把预测评分和实际评分表按照(userId, productId)做一个连接
val observed = data.map(item => ((item.user, item.product), item.rating))
val predict = predictRating.map(item => ((item.user, item.product), item.rating))
sqrt(
observed.join(predict).map {
case ((userId, productId), (actual, pre)) =>
val err = actual - pre
err * err
}.mean()
)
}
}
实时推荐模块
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
// 定义一个连接助手对象,建立到redis和mongodb的连接
object ConnHelper extends Serializable {
// 懒变量定义,使用的时候才初始化
lazy val jedis = new Jedis("localhost")
lazy val mongoClient = MongoClient(MongoClientURI("mongodb://localhost:27017/recommender"))
}
case class MongoConfig(uri: String, db: String)
// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)
// 定义用户的推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])
// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OnlineRecommender {
// 定义常量和表名
val MONGODB_RATING_COLLECTION = "Rating"
val STREAM_RECS = "StreamRecs"
val PRODUCT_RECS = "ProductRecs"
val MAX_USER_RATING_NUM = 20//矩阵
val MAX_SIM_PRODUCTS_NUM = 20//候选
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender",
"kafka.topic" -> "recommender"
)
// 创建spark conf
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OnlineRecommender")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(2))
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加载数据,相似度矩阵,广播出去
val simProductsMatrix = spark.read
.option("uri", mongoConfig.uri)
.option("collection", PRODUCT_RECS)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRecs]
.rdd
// 为了后续查询相似度方便,把数据转换成map形式
.map { item =>
(item.productId, item.recs.map(x => (x.productId, x.score)).toMap)
}
.collectAsMap()
// 定义广播变量
val simProcutsMatrixBC = sc.broadcast(simProductsMatrix)
// 创建kafka配置参数
val kafkaParam = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "recommender",
"auto.offset.reset" -> "latest"
)
// 创建一个DStream
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam)
)
// 对kafkaStream进行处理,产生评分流,userId|productId|score|timestamp
val ratingStream = kafkaStream.map { msg =>
var attr = msg.value().split("\\|")
(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
}
// 核心算法部分,定义评分流的处理流程
ratingStream.foreachRDD {
rdds =>
rdds.foreach {
case (userId, productId, score, timestamp) =>
println("rating data coming!>>>>>>>>>>>>>>>>>>")
// TODO: 核心算法流程
// 1. 从redis里取出当前用户的最近评分,保存成一个数组Array[(productId, score)]
val userRecentlyRatings = getUserRecentlyRatings(MAX_USER_RATING_NUM, userId, ConnHelper.jedis)
// 2. 从相似度矩阵中获取当前商品最相似的商品列表,作为备选列表,保存成一个数组Array[productId]
val candidateProducts = getTopSimProducts(MAX_SIM_PRODUCTS_NUM, productId, userId, simProcutsMatrixBC.value)
// 3. 计算每个备选商品的推荐优先级,得到当前用户的实时推荐列表,保存成 Array[(productId, score)]
val streamRecs = computeProductScore(candidateProducts, userRecentlyRatings, simProcutsMatrixBC.value)
// 4. 把推荐列表保存到mongodb
saveDataToMongoDB(userId, streamRecs)
}
}
// 启动streaming
ssc.start()
println("streaming started!")
ssc.awaitTermination()
}
/**
* 从redis里获取最近num次评分
*/
import scala.collection.JavaConversions._
def getUserRecentlyRatings(num: Int, userId: Int, jedis: Jedis): Array[(Int, Double)] = {
// 从redis中用户的评分队列里获取评分数据,list键名为uid:USERID,值格式是 PRODUCTID:SCORE
jedis.lrange("userId:" + userId.toString, 0, num)
.map { item =>
val attr = item.split("\\:")
(attr(0).trim.toInt, attr(1).trim.toDouble)
}
.toArray
}
// 获取当前商品的相似列表,并过滤掉用户已经评分过的,作为备选列表
def getTopSimProducts(num: Int,
productId: Int,
userId: Int,
simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
(implicit mongoConfig: MongoConfig): Array[Int] = {
// 从广播变量相似度矩阵中拿到当前商品的相似度列表
val allSimProducts = simProducts(productId).toArray
// 获得用户已经评分过的商品,过滤掉,排序输出
val ratingCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
val ratingExist = ratingCollection.find(MongoDBObject("userId" -> userId))
.toArray
.map { item => // 只需要productId
item.get("productId").toString.toInt
}
// 从所有的相似商品中进行过滤
allSimProducts.filter(x => !ratingExist.contains(x._1))
.sortWith(_._2 > _._2)
.take(num)
.map(x => x._1)
}
// 计算每个备选商品的推荐得分
def computeProductScore(candidateProducts: Array[Int],
userRecentlyRatings: Array[(Int, Double)],
simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
: Array[(Int, Double)] = {
// 定义一个长度可变数组ArrayBuffer,用于保存每一个备选商品的基础得分,(productId, score)
val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
// 定义两个map,用于保存每个商品的高分和低分的计数器,productId -> count
val increMap = scala.collection.mutable.HashMap[Int, Int]()
val decreMap = scala.collection.mutable.HashMap[Int, Int]()
// 遍历每个备选商品,计算和已评分商品的相似度
for (candidateProduct <- candidateProducts; userRecentlyRating <- userRecentlyRatings) {
// 从相似度矩阵中获取当前备选商品和当前已评分商品间的相似度
val simScore = getProductsSimScore(candidateProduct, userRecentlyRating._1, simProducts)
if (simScore > 0.4) {
// 按照公式进行加权计算,得到基础评分
scores += ((candidateProduct, simScore * userRecentlyRating._2))
if (userRecentlyRating._2 > 3) {
increMap(candidateProduct) = increMap.getOrDefault(candidateProduct, 0) + 1
} else {
decreMap(candidateProduct) = decreMap.getOrDefault(candidateProduct, 0) + 1
}
}
}
// 根据公式计算所有的推荐优先级,首先以productId做groupby
scores.groupBy(_._1).map {
case (productId, scoreList) =>
(productId, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(productId, 1)) - log(decreMap.getOrDefault(productId, 1)))
}
// 返回推荐列表,按照得分排序
.toArray
.sortWith(_._2 > _._2)
}
def getProductsSimScore(product1: Int, product2: Int,
simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = {
simProducts.get(product1) match {
case Some(sims) => sims.get(product2) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
// 自定义log函数,以N为底
def log(m: Int): Double = {
val N = 10
math.log(m) / math.log(N)
}
// 写入mongodb
def saveDataToMongoDB(userId: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit = {
val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(STREAM_RECS)
// 按照userId查询并更新
streamRecsCollection.findAndRemove(MongoDBObject("userId" -> userId))
streamRecsCollection.insert(MongoDBObject("userId" -> userId,
"recs" -> streamRecs.map(x => MongoDBObject("productId" -> x._1, "score" -> x._2))))
}
}
基于内容的离线推荐模块
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)
case class MongoConfig(uri: String, db: String)
// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)
// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object ContentRecommender {
// 定义mongodb中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 载入数据,做预处理
val productTagsDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.map(
x => (x.productId, x.name, x.tags.map(c => if (c == '|') ' ' else c))
)
.toDF("productId", "name", "tags")
.cache()
// TODO: 用TF-IDF提取商品特征向量
// 1. 实例化一个分词器,用来做分词,默认按照空格分
val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
// 用分词器做转换,得到增加一个新列words的DF
val wordsDataDF = tokenizer.transform(productTagsDF)
// 2. 定义一个HashingTF工具,计算频次
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(800)
val featurizedDataDF = hashingTF.transform(wordsDataDF)
// 3. 定义一个IDF工具,计算TF-IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 训练一个idf模型
val idfModel = idf.fit(featurizedDataDF)
// 得到增加新列features的DF
val rescaledDataDF = idfModel.transform(featurizedDataDF)
// 对数据进行转换,得到RDD形式的features
val productFeatures = rescaledDataDF.map {
row => (row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray)
}
.rdd
.map {
case (productId, features) => (productId, new DoubleMatrix(features))
}
// 两两配对商品,计算余弦相似度
val productRecs = productFeatures.cartesian(productFeatures)
.filter {
case (a, b) => a._1 != b._1
}
// 计算余弦相似度
.map {
case (a, b) =>
val simScore = consinSim(a._2, b._2)
(a._1, (b._1, simScore))
}
.filter(_._2._2 > 0.4)
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
}
.toDF()
productRecs.write
.option("uri", mongoConfig.uri)
.option("collection", CONTENT_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
product1.dot(product2) / (product1.norm2() * product2.norm2())
}
}
基于ItemCF的离线推荐
package com.atguigu.itemcf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
// 定义标准推荐对象
case class Recommendation(productId: Int, score: Double)
// 定义商品相似度列表
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object ItemCFRecommender {
// 定义常量和表名
val MONGODB_RATING_COLLECTION = "Rating"
val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
val MAX_RECOMMENDATION = 10
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ItemCFRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加载数据,转换成DF进行处理
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.map(
x => (x.userId, x.productId, x.score)
)
.toDF("userId", "productId", "score")
.cache()
// TODO: 核心算法,计算同现相似度,得到商品的相似列表
// 统计每个商品的评分个数,按照productId来做group by
val productRatingCountDF = ratingDF.groupBy("productId").count()
// 在原有的评分表上rating添加count
val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId")
// 将评分按照用户id两两配对,统计两个商品被同一个用户评分过的次数
val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
.toDF("userId", "product1", "score1", "count1", "product2", "score2", "count2")
.select("userId", "product1", "count1", "product2", "count2")
// 创建一张临时表,用于写sql查询
joinedDF.createOrReplaceTempView("joined")
// 按照product1,product2 做group by,统计userId的数量,就是对两个商品同时评分的人数
val cooccurrenceDF = spark.sql(
"""
|select product1
|, product2
|, count(userId) as cocount
|, first(count1) as count1
|, first(count2) as count2
|from joined
|group by product1, product2
""".stripMargin
).cache()
// 提取需要的数据,包装成( productId1, (productId2, score) )
val simDF = cooccurrenceDF.map {
row =>
val coocSim = cooccurrenceSim(row.getAs[Long]("cocount"), row.getAs[Long]("count1"), row.getAs[Long]("count2"))
(row.getInt(0), (row.getInt(1), coocSim))
}
.rdd
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList
.filter(x => x._1 != productId)
.sortWith(_._2 > _._2)
.take(MAX_RECOMMENDATION)
.map(x => Recommendation(x._1, x._2)))
}
.toDF()
// 保存到mongodb
simDF.write
.option("uri", mongoConfig.uri)
.option("collection", ITEM_CF_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
// 按照公式计算同现相似度
def cooccurrenceSim(coCount: Long, count1: Long, count2: Long): Double = {
coCount / math.sqrt(count1 * count2)
}
}