为什么在 RDD 中,map 会给出 NotSerializableException 而 foreach 不会?
Why in an RDD, map gives NotSerializableException while foreach doesn't?
我理解 map
和 foreach
之间的基本区别(懒惰和急切),也理解为什么这个代码片段
sc.makeRDD(Seq("a", "b")).map(s => new java.io.ByteArrayInputStream(s.getBytes)).collect
应该给
java.io.NotSerializableException: java.io.ByteArrayInputStream
然后我认为下面的代码片段也应该如此
sc.makeRDD(Seq("a", "b")).foreach(s => {
val is = new java.io.ByteArrayInputStream(s.getBytes)
println("is = " + is)
})
但是这段代码运行良好。为什么会这样?
实际上 map
和 foreach
之间的根本区别不是评估策略。让我们看一下签名(为简洁起见,我省略了 map
的隐式部分):
def map[U](f: (T) ⇒ U): RDD[U]
def foreach(f: (T) ⇒ Unit): Unit
map
接受从 T
到 U
的函数,将其应用于现有 RDD[T]
和 returns RDD[U]
的每个元素。为了允许像洗牌这样的操作 U
必须是可序列化的。
foreach
接受一个从 T
到 Unit
的函数(这类似于 Java void
)并且它本身 returns 没有.一切都发生在本地,不涉及网络流量,因此不需要序列化。与 map
不同,当想要获得某种副作用时应该使用 foreach
,例如 your previous question.
顺便说一句,这两者实际上是不同的。您在 map
中使用的匿名函数是一个函数:
(s: String) => java.io.ByteArrayInputStream
还有你在 foreach
中使用的一个像这样:
(s: String) => Unit
如果您将第二个函数与 map
一起使用,您的代码将会编译,尽管结果与您想要的相去甚远 (RDD[Unit]
)。
collect
在地图之后调用导致了问题。
以下是我在 spark-shell.
中的测试结果
下面通过,因为没有数据必须发送到其他节点。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).count
以下调用失败,因为地图输出可以发送到其他节点。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).first
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).collect
重新分区强制将数据分发到节点,但失败了。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).repartition(2).saveAsTextFile("/tmp/NWRepart")
呼叫通过后无需重新分区。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).saveAsTextFile("/tmp/NW")
我理解 map
和 foreach
之间的基本区别(懒惰和急切),也理解为什么这个代码片段
sc.makeRDD(Seq("a", "b")).map(s => new java.io.ByteArrayInputStream(s.getBytes)).collect
应该给
java.io.NotSerializableException: java.io.ByteArrayInputStream
然后我认为下面的代码片段也应该如此
sc.makeRDD(Seq("a", "b")).foreach(s => {
val is = new java.io.ByteArrayInputStream(s.getBytes)
println("is = " + is)
})
但是这段代码运行良好。为什么会这样?
实际上 map
和 foreach
之间的根本区别不是评估策略。让我们看一下签名(为简洁起见,我省略了 map
的隐式部分):
def map[U](f: (T) ⇒ U): RDD[U]
def foreach(f: (T) ⇒ Unit): Unit
map
接受从 T
到 U
的函数,将其应用于现有 RDD[T]
和 returns RDD[U]
的每个元素。为了允许像洗牌这样的操作 U
必须是可序列化的。
foreach
接受一个从 T
到 Unit
的函数(这类似于 Java void
)并且它本身 returns 没有.一切都发生在本地,不涉及网络流量,因此不需要序列化。与 map
不同,当想要获得某种副作用时应该使用 foreach
,例如 your previous question.
顺便说一句,这两者实际上是不同的。您在 map
中使用的匿名函数是一个函数:
(s: String) => java.io.ByteArrayInputStream
还有你在 foreach
中使用的一个像这样:
(s: String) => Unit
如果您将第二个函数与 map
一起使用,您的代码将会编译,尽管结果与您想要的相去甚远 (RDD[Unit]
)。
collect
在地图之后调用导致了问题。
以下是我在 spark-shell.
下面通过,因为没有数据必须发送到其他节点。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).count
以下调用失败,因为地图输出可以发送到其他节点。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).first
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).collect
重新分区强制将数据分发到节点,但失败了。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).repartition(2).saveAsTextFile("/tmp/NWRepart")
呼叫通过后无需重新分区。
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).saveAsTextFile("/tmp/NW")