Spark 无法正确过滤?
Spark could not filter correctly?
我遇到了结果不正确的有线问题。
我有一个名为 A 的 class,它有一个名为 keyword
的值。
我想过滤 RDD[A] 如果它有一些 keyword
.
星火环境:
版本:1.3.1
执行环境:yarn-client
代码如下:
class A ...
case class C(words:Set[String] ) extends Serializable {
def run(data:RDD[A])(implicit sc:SparkContext) ={
data.collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)
}
}
// in main function
val data:RDD[A] = ....
val c = C(Set("abc"))
c.run(data)
上面的代码没有打印任何内容。但是,如果我将 RDD[A] 收集到本地,那么它会打印一些东西!例如
data.take(1000).collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)}
怎么会这样?
让我问另一个相关问题:我应该让 case class C
扩展 Serializable
吗?我觉得没必要。
原因很简单。如果你在本地收集数据时 运行 println
函数,你的数据会通过网络传输到你正在使用的机器(我们称之为 Spark 环境的客户端),然后它打印在您的控制台上。到目前为止,一切都按预期进行。相反,如果您 运行 println
函数在 分布式 RDD
上,println
函数在工作机器上本地执行其中有你的数据。所以这个函数实际上被执行了,但是你不会在你的客户端的控制台上看到任何结果,除非它也是一个工作机器:事实上,一切都打印在各自工作节点的控制台上。
不,你没必要Serializable
,唯一连载的是你的words:Set[String]
。
我遇到了结果不正确的有线问题。
我有一个名为 A 的 class,它有一个名为 keyword
的值。
我想过滤 RDD[A] 如果它有一些 keyword
.
星火环境: 版本:1.3.1 执行环境:yarn-client
代码如下:
class A ...
case class C(words:Set[String] ) extends Serializable {
def run(data:RDD[A])(implicit sc:SparkContext) ={
data.collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)
}
}
// in main function
val data:RDD[A] = ....
val c = C(Set("abc"))
c.run(data)
上面的代码没有打印任何内容。但是,如果我将 RDD[A] 收集到本地,那么它会打印一些东西!例如
data.take(1000).collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)}
怎么会这样?
让我问另一个相关问题:我应该让 case class C
扩展 Serializable
吗?我觉得没必要。
原因很简单。如果你在本地收集数据时 运行 println
函数,你的数据会通过网络传输到你正在使用的机器(我们称之为 Spark 环境的客户端),然后它打印在您的控制台上。到目前为止,一切都按预期进行。相反,如果您 运行 println
函数在 分布式 RDD
上,println
函数在工作机器上本地执行其中有你的数据。所以这个函数实际上被执行了,但是你不会在你的客户端的控制台上看到任何结果,除非它也是一个工作机器:事实上,一切都打印在各自工作节点的控制台上。
不,你没必要Serializable
,唯一连载的是你的words:Set[String]
。