火花中的迭代过滤器似乎不起作用

Iterative filter in spark doesn't seem to work

我试图逐一删除 RDD 的元素,但这不起作用,因为元素重新出现了。

这是我的部分代码:

rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0, 1, 2, 3]

所以似乎只有最后一个过滤器是“记住”。我在想这个循环之后,rdd会是空的。

但是,我不明白为什么,每次我把filter得到的新rdd保存在“rdd”中,那么它不应该保留所有的转换吗?如果没有,我该怎么办?

感谢您指出我错误的地方!

结果是正确的——这不是Spark的错误。请注意,lambda 函数定义为 x != i,并且 i 未代入 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像

rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)

等等

由于过滤器都是相同的,并且它们将被替换为i的最新值,因此每次for循环迭代仅过滤掉一项。

为避免这种情况,您可以使用部分函数来确保 i 代入函数:

from functools import partial
 
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd = rdd.filter(partial(lambda x, i: x != i, i))

print(rdd.collect())

或者您可以使用 reduce:

from functools import reduce

rdd = spark.sparkContext.parallelize([0,1,2])
rdd = reduce(lambda r, i: r.filter(lambda x: x != i), range(3), rdd)
print(rdd.collect())