当他们有相同的成员时,不能用另一个压缩 RDD - PySpark

Can't Zip RDD with another one while they have equal members - PySpark

我在过滤项目后压缩两个 RDD 时遇到问题。这是我的代码:

x = sc.parallelize([1,2,3,4])
x = x.zipWithIndex()

# filtering the first element
m1 = x.filter(lambda z: z[1] > 0)
# filtering the last element
m2 = x.filter(lambda z: z[1] < 3)

#zipping
m1.zip(m2).collect()
# expected output: [(1,2),(2,3),(3,4)]

这是我得到的错误:

ValueError: Can not deserialize PairRDD with different number of items in batches: (1, 2)

虽然他们有相同的物品!

提前致谢!

我使用 1.zipWithIndex 2.cartesian 3.filter:

得到了想要的输出
x = sc.parallelize([1,2,3,4])

#1
x = x.zipWithIndex()

#2
x = x.cartesian(x)

#3
x = x.filter(lambda pair: pair[0][1]+1 == pair[1][1])

print(x.collect())
#output: [(1,2),(2,3),(3,4)]

但还是如有其他处理方法,请告知!

您可以通过在压缩的 rdd 中应用映射函数来获得预期的输出

m1.zip(m2).map(lambda x: (x[0][1], x[0][0])).collect()

[(1, 2), (2, 3), (3, 4)]