Spark 无法正确过滤?

Spark could not filter correctly?

我遇到了结果不正确的有线问题。 我有一个名为 A 的 class,它有一个名为 keyword 的值。 我想过滤 RDD[A] 如果它有一些 keyword.

星火环境: 版本:1.3.1 执行环境:yarn-client

代码如下:

class A ...
case class C(words:Set[String] ) extends Serializable {

  def run(data:RDD[A])(implicit sc:SparkContext) ={
    data.collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)
  }
}

   // in main function
  val data:RDD[A] = ....
  val c = C(Set("abc"))
  c.run(data)

上面的代码没有打印任何内容。但是,如果我将 RDD[A] 收集到本地,那么它会打印一些东西!例如

data.take(1000).collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)}

怎么会这样?

让我问另一个相关问题:我应该让 case class C 扩展 Serializable 吗?我觉得没必要。

原因很简单。如果你在本地收集数据时 运行 println 函数,你的数据会通过网络传输到你正在使用的机器(我们称之为 Spark 环境的客户端),然后它打印在您的控制台上。到目前为止,一切都按预期进行。相反,如果您 运行 println 函数在 分布式 RDD 上,println 函数在工作机器上本地执行其中有你的数据。所以这个函数实际上被执行了,但是你不会在你的客户端的控制台上看到任何结果,除非它也是一个工作机器:事实上,一切都打印在各自工作节点的控制台上。

不,你没必要Serializable,唯一连载的是你的words:Set[String]