使用来自 ML 的 ALS 的推荐系统

Recommender System using ALS from ML

我做了一些研究,据我所知,有两种可能的方法可以使用 Apache Spark 创建 推荐系统 ,一种方法是使用 MLLib 带有来自 ML 的相当不错的 example, which I have tried and is very easy, on the other hand you can use ALS。我觉得使用 RDD 很舒服,不过我正在尝试更频繁地使用 DataFrames 以获得更多经验。

为了练习,我开始使用一些疯狂的数据,其中 ratings 被标准化,我有超过 4000 条记录,只有 5 种可能的产品(如下所示)。所以我的第一个挑战是如何将这个 DataFrame 转换为所需的结构;我在阅读源代码几个小时后猜到的结构。

val df = sqlContext.createDataFrame(sc.parallelize(List(Row(0.0, 0.12, 0.1, 0.0, 0.16),
                                                        Row(0.1, 0.0, 0.3, 0.52, 0.67))),
                                    StructType(StructField("product1", DoubleType, true) ::
                                               StructField("product2", DoubleType, true) ::
                                               StructField("product3", DoubleType, true) ::
                                               StructField("product4", DoubleType, true) ::
                                               StructField("product5", DoubleType, true) :: Nil))

df.show

+--------+--------+--------+--------+--------+
|product1|product2|product3|product4|product5|
+--------+--------+--------+--------+--------+
|     0.0|    0.12|     0.1|     0.0|    0.16|
|     0.1|     0.0|     0.3|    0.52|    0.67|
+--------+--------+--------+--------+--------+

我做了几个复杂的转换,我想看看是否有更好的方法来获得所需的结构。

val rdd = df.rdd.zipWithIndex.map {
    case (row, index) => row.toSeq.zipWithIndex.map(x => Row(index.toInt, x._2.toInt, x._1)) 
}.flatMap{x => x}

val (train, testing) = rdd.partitionBy(_.get(2) != 0.0)
val rdds = List(train, testing)

然后我将那些 RDD 转换为 DataFrame

val dfs = rdds.map(sqlContext.createDataFrame(_, StructType(StructField("user", IntegerType, true) ::
                                                            StructField("product", IntegerType, true) ::
                                                            StructField("rating", DoubleType, true) :: Nil)))

经过所有这些步骤,我终于可以使用 ALS 算法了,当事情变得如此冗长时,可能是因为你做错了什么。

val rec = (new ALS().setUserCol("user")
                    .setItemCol("product")
                    .setRatingCol("rating")
                    .setPredictionCol("value")
                    .setSeed(17)
                    .setMaxIter(20))

val model = rec.fit(dfs(0))

model.transform(dfs(1)).collect
Array([0,0,0.0,0.022231804], [1,1,0.0,0.102589644], [0,3,0.0,0.11560536])

一些备注:

  • userratinguserColratingCol 的默认参数。如果您将 product 重命名为 item 您也可以省略这一项。
  • 您可以将 Row 替换为 Rating 并稍后省略架构:

    case (row, u) => 
       row.toSeq.zipWithIndex.map{ case (r: Double, i: Int) => Rating(u, i, r) }
    
    ...
    .toDF
    
  • 因为 id 似乎无关紧要,您可以使用 zipWithUniqueId
  • 如果 uniqueId 可以接受,您可以将 monotonically_increasing_idDataFrame
  • 一起使用
  • 可以通过数组爆炸来避免将数据传递给 RDD:

    val exprs = explode(array(df.columns.map(c => 
      struct(lit(c).alias("item"), col(c).alias("rating"))): _*
    ))
    
    df
      .withColumn("user", monotonically_increasing_id)
      .withColumn("tmp", exprs)
      .select($"user", $"tmp.item", $"tmp.rating")
    

    并用 ID 替换名称。

尽管如此,我认为在这里使用 DataFrames 并没有太大的好处。一种或另一种数据将被传递回 MLlib 模型,该模型需要 RDD[Rating].