火花中的迭代过滤器似乎不起作用
Iterative filter in spark doesn't seem to work
我试图逐一删除 RDD 的元素,但这不起作用,因为元素重新出现了。
这是我的部分代码:
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0, 1, 2, 3]
所以似乎只有最后一个过滤器是“记住”。我在想这个循环之后,rdd会是空的。
但是,我不明白为什么,每次我把filter得到的新rdd保存在“rdd”中,那么它不应该保留所有的转换吗?如果没有,我该怎么办?
感谢您指出我错误的地方!
结果是正确的——这不是Spark的错误。请注意,lambda 函数定义为 x != i
,并且 i
未代入 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像
rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)
等等
由于过滤器都是相同的,并且它们将被替换为i
的最新值,因此每次for循环迭代仅过滤掉一项。
为避免这种情况,您可以使用部分函数来确保 i
代入函数:
from functools import partial
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd = rdd.filter(partial(lambda x, i: x != i, i))
print(rdd.collect())
或者您可以使用 reduce
:
from functools import reduce
rdd = spark.sparkContext.parallelize([0,1,2])
rdd = reduce(lambda r, i: r.filter(lambda x: x != i), range(3), rdd)
print(rdd.collect())
我试图逐一删除 RDD 的元素,但这不起作用,因为元素重新出现了。
这是我的部分代码:
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0, 1, 2, 3]
所以似乎只有最后一个过滤器是“记住”。我在想这个循环之后,rdd会是空的。
但是,我不明白为什么,每次我把filter得到的新rdd保存在“rdd”中,那么它不应该保留所有的转换吗?如果没有,我该怎么办?
感谢您指出我错误的地方!
结果是正确的——这不是Spark的错误。请注意,lambda 函数定义为 x != i
,并且 i
未代入 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像
rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)
等等
由于过滤器都是相同的,并且它们将被替换为i
的最新值,因此每次for循环迭代仅过滤掉一项。
为避免这种情况,您可以使用部分函数来确保 i
代入函数:
from functools import partial
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd = rdd.filter(partial(lambda x, i: x != i, i))
print(rdd.collect())
或者您可以使用 reduce
:
from functools import reduce
rdd = spark.sparkContext.parallelize([0,1,2])
rdd = reduce(lambda r, i: r.filter(lambda x: x != i), range(3), rdd)
print(rdd.collect())