使用共享可变状态向 RDD 添加索引

Adding an index to an RDD using a shared mutable state

以这个简单的RDD为例说明问题:

val testRDD=sc.parallelize(List((1, 2), (3, 4), (3, 6)))

我有这个功能来帮助我实现索引:

 var sum = 0; 

 def inc(l: Int): Int = {
    sum += l
    sum 
 }

现在我想为每个元组创建 id:

val indexedRDD= testRDD.map(x=>(x._1,x._2,inc(1)));

输出的RDD应该是((1,2,1), (3,4,2), (3,6,3))

但事实证明,所有的值都是一样的。所有元组都取 1:

((1,2,1), (3,4,1), (3,6,1))

我哪里错了?有没有其他方法可以达到同样的目的。

您正在寻找:

def zipWithIndex(): RDD[(T, Long)]

但是,文档中的注释:

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.