为什么在 RDD 中,map 会给出 NotSerializableException 而 foreach 不会?

Why in an RDD, map gives NotSerializableException while foreach doesn't?

我理解 mapforeach 之间的基本区别(懒惰和急切),也理解为什么这个代码片段

sc.makeRDD(Seq("a", "b")).map(s => new java.io.ByteArrayInputStream(s.getBytes)).collect

应该给

java.io.NotSerializableException: java.io.ByteArrayInputStream

然后我认为下面的代码片段也应该如此

sc.makeRDD(Seq("a", "b")).foreach(s => {
  val is = new java.io.ByteArrayInputStream(s.getBytes)
  println("is = " + is)
})

但是这段代码运行良好。为什么会这样?

实际上 mapforeach 之间的根本区别不是评估策略。让我们看一下签名(为简洁起见,我省略了 map 的隐式部分):

def map[U](f: (T) ⇒ U): RDD[U]
def foreach(f: (T) ⇒ Unit): Unit 

map 接受从 TU 的函数,将其应用于现有 RDD[T] 和 returns RDD[U] 的每个元素。为了允许像洗牌这样的操作 U 必须是可序列化的。

foreach 接受一个从 TUnit 的函数(这类似于 Java void)并且它本身 returns 没有.一切都发生在本地,不涉及网络流量,因此不需要序列化。与 map 不同,当想要获得某种副作用时应该使用 foreach,例如 your previous question.

顺便说一句,这两者实际上是不同的。您在 map 中使用的匿名函数是一个函数:

(s: String) => java.io.ByteArrayInputStream

还有你在 foreach 中使用的一个像这样:

(s: String) => Unit

如果您将第二个函数与 map 一起使用,您的代码将会编译,尽管结果与您想要的相去甚远 (RDD[Unit])。

collect 在地图之后调用导致了问题。 以下是我在 spark-shell.

中的测试结果

下面通过,因为没有数据必须发送到其他节点。

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).count

以下调用失败,因为地图输出可以发送到其他节点。

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).first
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).collect

重新分区强制将数据分发到节点,但失败了。

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).repartition(2).saveAsTextFile("/tmp/NWRepart")

呼叫通过后无需重新分区。

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).saveAsTextFile("/tmp/NW")