使用来自 ML 的 ALS 的推荐系统
Recommender System using ALS from ML
我做了一些研究,据我所知,有两种可能的方法可以使用 Apache Spark
创建 推荐系统 ,一种方法是使用 MLLib
带有来自 ML
的相当不错的 example, which I have tried and is very easy, on the other hand you can use ALS。我觉得使用 RDD
很舒服,不过我正在尝试更频繁地使用 DataFrames
以获得更多经验。
为了练习,我开始使用一些疯狂的数据,其中 ratings
被标准化,我有超过 4000 条记录,只有 5 种可能的产品(如下所示)。所以我的第一个挑战是如何将这个 DataFrame
转换为所需的结构;我在阅读源代码几个小时后猜到的结构。
val df = sqlContext.createDataFrame(sc.parallelize(List(Row(0.0, 0.12, 0.1, 0.0, 0.16),
Row(0.1, 0.0, 0.3, 0.52, 0.67))),
StructType(StructField("product1", DoubleType, true) ::
StructField("product2", DoubleType, true) ::
StructField("product3", DoubleType, true) ::
StructField("product4", DoubleType, true) ::
StructField("product5", DoubleType, true) :: Nil))
df.show
+--------+--------+--------+--------+--------+
|product1|product2|product3|product4|product5|
+--------+--------+--------+--------+--------+
| 0.0| 0.12| 0.1| 0.0| 0.16|
| 0.1| 0.0| 0.3| 0.52| 0.67|
+--------+--------+--------+--------+--------+
我做了几个复杂的转换,我想看看是否有更好的方法来获得所需的结构。
val rdd = df.rdd.zipWithIndex.map {
case (row, index) => row.toSeq.zipWithIndex.map(x => Row(index.toInt, x._2.toInt, x._1))
}.flatMap{x => x}
val (train, testing) = rdd.partitionBy(_.get(2) != 0.0)
val rdds = List(train, testing)
然后我将那些 RDD
转换为 DataFrame
。
val dfs = rdds.map(sqlContext.createDataFrame(_, StructType(StructField("user", IntegerType, true) ::
StructField("product", IntegerType, true) ::
StructField("rating", DoubleType, true) :: Nil)))
经过所有这些步骤,我终于可以使用 ALS
算法了,当事情变得如此冗长时,可能是因为你做错了什么。
val rec = (new ALS().setUserCol("user")
.setItemCol("product")
.setRatingCol("rating")
.setPredictionCol("value")
.setSeed(17)
.setMaxIter(20))
val model = rec.fit(dfs(0))
model.transform(dfs(1)).collect
Array([0,0,0.0,0.022231804], [1,1,0.0,0.102589644], [0,3,0.0,0.11560536])
一些备注:
user
和 rating
是 userCol
和 ratingCol
的默认参数。如果您将 product
重命名为 item
您也可以省略这一项。
您可以将 Row 替换为 Rating 并稍后省略架构:
case (row, u) =>
row.toSeq.zipWithIndex.map{ case (r: Double, i: Int) => Rating(u, i, r) }
...
.toDF
- 因为
id
似乎无关紧要,您可以使用 zipWithUniqueId
- 如果
uniqueId
可以接受,您可以将 monotonically_increasing_id
与 DataFrame
一起使用
可以通过数组爆炸来避免将数据传递给 RDD:
val exprs = explode(array(df.columns.map(c =>
struct(lit(c).alias("item"), col(c).alias("rating"))): _*
))
df
.withColumn("user", monotonically_increasing_id)
.withColumn("tmp", exprs)
.select($"user", $"tmp.item", $"tmp.rating")
并用 ID 替换名称。
尽管如此,我认为在这里使用 DataFrames
并没有太大的好处。一种或另一种数据将被传递回 MLlib
模型,该模型需要 RDD[Rating]
.
我做了一些研究,据我所知,有两种可能的方法可以使用 Apache Spark
创建 推荐系统 ,一种方法是使用 MLLib
带有来自 ML
的相当不错的 example, which I have tried and is very easy, on the other hand you can use ALS。我觉得使用 RDD
很舒服,不过我正在尝试更频繁地使用 DataFrames
以获得更多经验。
为了练习,我开始使用一些疯狂的数据,其中 ratings
被标准化,我有超过 4000 条记录,只有 5 种可能的产品(如下所示)。所以我的第一个挑战是如何将这个 DataFrame
转换为所需的结构;我在阅读源代码几个小时后猜到的结构。
val df = sqlContext.createDataFrame(sc.parallelize(List(Row(0.0, 0.12, 0.1, 0.0, 0.16),
Row(0.1, 0.0, 0.3, 0.52, 0.67))),
StructType(StructField("product1", DoubleType, true) ::
StructField("product2", DoubleType, true) ::
StructField("product3", DoubleType, true) ::
StructField("product4", DoubleType, true) ::
StructField("product5", DoubleType, true) :: Nil))
df.show
+--------+--------+--------+--------+--------+
|product1|product2|product3|product4|product5|
+--------+--------+--------+--------+--------+
| 0.0| 0.12| 0.1| 0.0| 0.16|
| 0.1| 0.0| 0.3| 0.52| 0.67|
+--------+--------+--------+--------+--------+
我做了几个复杂的转换,我想看看是否有更好的方法来获得所需的结构。
val rdd = df.rdd.zipWithIndex.map {
case (row, index) => row.toSeq.zipWithIndex.map(x => Row(index.toInt, x._2.toInt, x._1))
}.flatMap{x => x}
val (train, testing) = rdd.partitionBy(_.get(2) != 0.0)
val rdds = List(train, testing)
然后我将那些 RDD
转换为 DataFrame
。
val dfs = rdds.map(sqlContext.createDataFrame(_, StructType(StructField("user", IntegerType, true) ::
StructField("product", IntegerType, true) ::
StructField("rating", DoubleType, true) :: Nil)))
经过所有这些步骤,我终于可以使用 ALS
算法了,当事情变得如此冗长时,可能是因为你做错了什么。
val rec = (new ALS().setUserCol("user")
.setItemCol("product")
.setRatingCol("rating")
.setPredictionCol("value")
.setSeed(17)
.setMaxIter(20))
val model = rec.fit(dfs(0))
model.transform(dfs(1)).collect
Array([0,0,0.0,0.022231804], [1,1,0.0,0.102589644], [0,3,0.0,0.11560536])
一些备注:
user
和rating
是userCol
和ratingCol
的默认参数。如果您将product
重命名为item
您也可以省略这一项。您可以将 Row 替换为 Rating 并稍后省略架构:
case (row, u) => row.toSeq.zipWithIndex.map{ case (r: Double, i: Int) => Rating(u, i, r) } ... .toDF
- 因为
id
似乎无关紧要,您可以使用zipWithUniqueId
- 如果
uniqueId
可以接受,您可以将monotonically_increasing_id
与DataFrame
一起使用
可以通过数组爆炸来避免将数据传递给 RDD:
val exprs = explode(array(df.columns.map(c => struct(lit(c).alias("item"), col(c).alias("rating"))): _* )) df .withColumn("user", monotonically_increasing_id) .withColumn("tmp", exprs) .select($"user", $"tmp.item", $"tmp.rating")
并用 ID 替换名称。
尽管如此,我认为在这里使用 DataFrames
并没有太大的好处。一种或另一种数据将被传递回 MLlib
模型,该模型需要 RDD[Rating]
.