mapPartitions returns 空数组
mapPartitions returns empty array
我有以下具有 4 个分区的 RDD:-
val rdd=sc.parallelize(1 to 20,4)
现在我尝试调用 mapPartitions:-
scala> rdd.mapPartitions(x=> { println(x.size); x }).collect
5
5
5
5
res98: Array[Int] = Array()
为什么它 return 是空数组? anonymoys 函数只是 returning 它收到的同一个迭代器,那么它是如何 returning 空数组的呢?有趣的是,如果我删除 println 语句,它确实是 returns 非空数组:-
scala> rdd.mapPartitions(x=> { x }).collect
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
这个我不明白。为什么 println(它只是打印迭代器的大小)的存在会影响函数的最终结果?
那是因为x
是一个TraversableOnce
,也就是说你通过调用size
遍历了它,然后返回了....空。
您可以通过多种方式解决这个问题,但这里有一个:
rdd.mapPartitions(x=> {
val list = x.toList;
println(list.size);
list.toIterator
}).collect
要了解发生了什么,我们必须看一下您传递给 mapPartitions
的函数的签名:
(Iterator[T]) ⇒ Iterator[U]
那么什么是 Iterator
?如果你看一下 Iterator
documentation 你会发现它是一个扩展 TraversableOnce
:
的特征
trait Iterator[+A] extends TraversableOnce[A]
以上内容应该可以提示您的情况。迭代器提供了两种方法 hasNext
和 next
。要获得 Iterator 的 size
,您必须简单地迭代它。在那之后 hasNext
returns false
结果是空的 Iterator
。
我有以下具有 4 个分区的 RDD:-
val rdd=sc.parallelize(1 to 20,4)
现在我尝试调用 mapPartitions:-
scala> rdd.mapPartitions(x=> { println(x.size); x }).collect
5
5
5
5
res98: Array[Int] = Array()
为什么它 return 是空数组? anonymoys 函数只是 returning 它收到的同一个迭代器,那么它是如何 returning 空数组的呢?有趣的是,如果我删除 println 语句,它确实是 returns 非空数组:-
scala> rdd.mapPartitions(x=> { x }).collect
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
这个我不明白。为什么 println(它只是打印迭代器的大小)的存在会影响函数的最终结果?
那是因为x
是一个TraversableOnce
,也就是说你通过调用size
遍历了它,然后返回了....空。
您可以通过多种方式解决这个问题,但这里有一个:
rdd.mapPartitions(x=> {
val list = x.toList;
println(list.size);
list.toIterator
}).collect
要了解发生了什么,我们必须看一下您传递给 mapPartitions
的函数的签名:
(Iterator[T]) ⇒ Iterator[U]
那么什么是 Iterator
?如果你看一下 Iterator
documentation 你会发现它是一个扩展 TraversableOnce
:
trait Iterator[+A] extends TraversableOnce[A]
以上内容应该可以提示您的情况。迭代器提供了两种方法 hasNext
和 next
。要获得 Iterator 的 size
,您必须简单地迭代它。在那之后 hasNext
returns false
结果是空的 Iterator
。