mapPartitions returns 空数组

mapPartitions returns empty array

我有以下具有 4 个分区的 RDD:-

val rdd=sc.parallelize(1 to 20,4)

现在我尝试调用 mapPartitions:-

scala> rdd.mapPartitions(x=> { println(x.size); x }).collect
5
5
5
5
res98: Array[Int] = Array()

为什么它 return 是空数组? anonymoys 函数只是 returning 它收到的同一个迭代器,那么它是如何 returning 空数组的呢?有趣的是,如果我删除 println 语句,它确实是 returns 非空数组:-

scala> rdd.mapPartitions(x=> { x }).collect
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

这个我不明白。为什么 println(它只是打印迭代器的大小)的存在会影响函数的最终结果?

那是因为x是一个TraversableOnce,也就是说你通过调用size遍历了它,然后返回了....空。

您可以通过多种方式解决这个问题,但这里有一个:

rdd.mapPartitions(x=> {
  val list = x.toList;
  println(list.size);
  list.toIterator
}).collect

要了解发生了什么,我们必须看一下您传递给 mapPartitions 的函数的签名:

(Iterator[T]) ⇒ Iterator[U]

那么什么是 Iterator?如果你看一下 Iterator documentation 你会发现它是一个扩展 TraversableOnce:

的特征
trait Iterator[+A] extends TraversableOnce[A]

以上内容应该可以提示您的情况。迭代器提供了两种方法 hasNextnext。要获得 Iterator 的 size,您必须简单地迭代它。在那之后 hasNext returns false 结果是空的 Iterator