Scala spark 在带有 yield 的平面图之后无法访问元组

Scala spark can't access tuple after flatmap with yield

    val test_rated = pearson.flatMap(x => {
        val bid1 = x._1._1.toInt
        val bid2 = x._1._2.toInt
        val sim = x._2.toDouble

        val pairs = for (tu <- test_users) yield {
            val tid = tu.toInt
            if (test_map.contains((tid, bid2)) && train_map.contains((tid, bid1))){
                ((tid, bid2), (bid1, sim))
            }
        }
        pairs
    }).filter(row => row!= ())

在此代码块中 test_users 是一个 Scala list。在使用 yield 进行 flatMap 操作后,我能够得到结果并使用 foreach 打印出来。 但是,如果我想像

一样再次映射它
test_rated.map(x => x._2)

我无法使用 x._2

访问每个单独的元组值

我相信 flatMap() 中的代码块没有对任何对象求值。您应该删除 val pairs = 或显式 return pairs - 例如

pearson.flatMap(x => { 
  ... 
  val pairs = ...
  pairs
}).filter(...)

假设代码的其余部分是正确的并且类型匹配(如果没有 [mcve] 就不可能说))您只是以错误的方式执行此操作。让我们简化这个过程来说明原因:

scala> for {
     | x <- 1 to 5
     | } yield { if (x % 2 == 0) x}
res0: scala.collection.immutable.IndexedSeq[AnyVal] = Vector((), 2, (), 4, ())

如您所见,类型不是 Seq[Int],而是 Seq[AnyVal]。过滤数据不会改变:

scala> res0.filter(x => x != ())
res1: scala.collection.immutable.IndexedSeq[AnyVal] = Vector(2, 4)

你可以collect:

scala> res0.collect { case x: Int => x}
res2: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4)

但实际上你应该使用保护表达式:

scala> for {
     | x <- 1 to 5 if x % 2 == 0
     | } yield x
res3: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4)

所以你的代码应该重写为:

val test_rated = pearson.flatMap(x => {
    val bid1 = x._1._1.toInt
    val bid2 = x._1._2.toInt
    val sim = x._2.toDouble

    def keep(tid) = {
      test_map.contains((tid, bid2)) && train_map.contains((tid, bid1))
    }

    for {
      tid <- test_users.map(_.toInt) if keep(tid)
    } yield ((tid, bid2), (bid1, sim))
})