从 Scala WrappedArray 中提取值

Extracting values from Scala WrappedArray

我正在使用 Apache Spark 的 ALS 模型,recommendForAllUsers 方法return是一个具有架构

的数据框
root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

实际上,建议是一个 WrappedArray,例如:

WrappedArray([636958,0.32910484], [995322,0.31974298], [1102140,0.30444127], [1160820,0.27908015], [1208899,0.26943958])

我正在尝试提取 只是 item_ids 和 return 它们作为一维数组。所以上面的例子是 [636958,995322,1102140,1160820,1208899]

这就是给我带来麻烦的原因。到目前为止我有:

    val numberOfRecs = 20
    val userRecs = model.recommendForAllUsers(numberOfRecs).cache()

    val strippedScores = userRecs.rdd.map(row => {
      val user_id = row.getInt(0)
      val recs = row.getAs[Seq[Row]](1)

      val item_ids = new Array[Int](numberOfRecs)

      recs.toArray.foreach(x => {
        item_ids :+ x.get(0)
      })

      item_ids
    })

但这只是 returns [I@2f318251,如果我通过 mkString(",") 获取它的字符串值,它 returns 0,0,0,0,0,0

关于如何将 item_ids 和 return 提取为单独的一维数组有什么想法吗?

您可以使用完全限定名称访问数组中的结构元素:

    scala> case class Recommendation(item_id: Int, rating: Float)
defined class Recommendation

scala> val userReqs = Seq(Array(Recommendation(636958,0.32910484f), Recommendation(995322,0.31974298f), Recommendation(1102140,0.30444127f), Recommendation(1160820,0.27908015f), Recommendation(1208899,0.26943958f))).toDF
userReqs: org.apache.spark.sql.DataFrame = [value: array<struct<item_id:int,rating:float>>]

scala> userReqs.printSchema
root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: integer (nullable = false)
 |    |    |-- rating: float (nullable = false)


scala> userReqs.select("value.item_id").show(false)
+-------------------------------------------+
|item_id                                    |
+-------------------------------------------+
|[636958, 995322, 1102140, 1160820, 1208899]|
+-------------------------------------------+

scala> val ids = userReqs.select("value.item_id").collect().flatMap(_.getAs[Seq[Int]](0))
ids: Array[Int] = Array(636958, 995322, 1102140, 1160820, 1208899)

在 Spark ALSModel 文档中发现 recommendForAllUsers returns

"a DataFrame of (userCol: Int, recommendations), where recommendations are stored as an array of (itemCol: Int, rating: Float) Rows" (https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.ml.recommendation.ALSModel)

通过数组,它表示 WrappedArray,因此我没有尝试将其转换为 Seq[Row],而是将其转换为 mutable.WrappedArray[Row]。然后我能够得到每个 item_id 喜欢:

    val userRecItems = userRecs.rdd.map(row => {
      val user_id = row.getInt(0)
      val recs = row.getAs[mutable.WrappedArray[Row]](1)

      for (rec <- recs) {
        val item_id = rec.getInt(0)
        userRecommendatinos += game_id
      }
    })

其中 userRecommendations 是一个可变的 ArrayBuffer