RDD 的 foreachPartition 方法内的意外行为

Unexpected behavior inside the foreachPartition method of a RDD

我通过 spark-shell 评估了以下几行 scala 代码:

val a = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
val b = a.coalesce(1)
b.foreachPartition { p => 
  p.map(_ + 1).foreach(println)
  p.map(_ * 2).foreach(println)
}

输出如下:

2
3
4
5
6
7
8
9
10
11

为什么分区 p 在第一个映射后变空了?

我觉得这并不奇怪,因为 pIterator,当你用 map 遍历它时,它没有更多的值,并考虑到 lengthsize 的快捷方式,它是这样实现的:

def size: Int = {
  var result = 0
  for (x <- self) result += 1
  result
}

你得到 0。

答案在 scala 文档中 http://www.scala-lang.org/api/2.11.8/#scala.collection.Iterator。它明确指出迭代器(p 是一个迭代器)在调用 map 方法后必须被丢弃。