无法反序列化具有不同数量项目的 RDD

Cannot deserialize RDD with different number of items in pair

我有两个具有键值对的 RDD。我想按键加入它们(并根据按键,获取所有值的笛卡尔积),我认为这可以通过 pyspark 的 zip() 函数来完成。但是,当我应用它时,

elemPairs = elems1.zip(elems2).reduceByKey(add)

它给我错误:

Cannot deserialize RDD with different number of items in pair: (40, 10)

这是我尝试压缩的 2 个 RDD:

elems1 => [((0, 0), ('A', 0, 90)), ((0, 1), ('A', 0, 90)), ((0, 2), ('A', 0, 90)), ((0, 3), ('A', 0, 90)), ((0, 4), ('A', 0, 90)), ((0, 0), ('A', 1, 401)), ((0, 1), ('A', 1, 401)), ((0, 2), ('A', 1, 401)), ((0, 3), ('A', 1, 401)), ((0, 4), ('A', 1, 401)), ((1, 0), ('A', 0, 305)), ((1, 1), ('A', 0, 305)), ((1, 2), ('A', 0, 305)), ((1, 3), ('A', 0, 305)), ((1, 4), ('A', 0, 305)), ((1, 0), ('A', 1, 351)), ((1, 1), ('A', 1, 351)), ((1, 2), ('A', 1, 351)), ((1, 3), ('A', 1, 351)), ((1, 4), ('A', 1, 351)), ((2, 0), ('A', 0, 178)), ((2, 1), ('A', 0, 178)), ((2, 2), ('A', 0, 178)), ((2, 3), ('A', 0, 178)), ((2, 4), ('A', 0, 178)), ((2, 0), ('A', 1, 692)), ((2, 1), ('A', 1, 692)), ((2, 2), ('A', 1, 692)), ((2, 3), ('A', 1, 692)), ((2, 4), ('A', 1, 692)), ((3, 0), ('A', 0, 936)), ((3, 1), ('A', 0, 936)), ((3, 2), ('A', 0, 936)), ((3, 3), ('A', 0, 936)), ((3, 4), ('A', 0, 936)), ((3, 0), ('A', 1, 149)), ((3, 1), ('A', 1, 149)), ((3, 2), ('A', 1, 149)), ((3, 3), ('A', 1, 149)), ((3, 4), ('A', 1, 149))]

elems2 => [((0, 0), ('B', 0, 573)), ((1, 0), ('B', 0, 573)), ((2, 0), ('B', 0, 573)), ((3, 0), ('B', 0, 573)), ((4, 0), ('B', 0, 573)), ((0, 0), ('B', 1, 324)), ((1, 0), ('B', 1, 324)), ((2, 0), ('B', 1, 324)), ((3, 0), ('B', 1, 324)), ((4, 0), ('B', 1, 324))]

其中 ((0, 0), ('B', 0, 573)), (0, 0) 是键,('B', 0, 573) 是值。

经过快速google搜索,我发现这是一个只出现在spark 1.2中的问题,但是我使用的是Spark 1.5

为什么不直接使用 elems1.join(elems2)

RDD API.

中描述了该错误消息的原因

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

正如@alwaysprep 所说,您可以使用 join,因为 zip 做的事情完全不同:

val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
a.zip(b).collect
res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), 
(6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), 
(14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), 
(22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), 
(30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), 
(38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), 
(46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), 
(54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), 
(62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), 
(70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), 
(78,...

如您所见,zip 将数组 a 的第 n 个元素关联到数组 b 的第 n 个元素,因此数组必须具有相同的大小。

在您的例子中,数组 elem1 包含的元素多于 elem2 - 也许您可以查看 rightOuterJoin(或 leftOuterJoin)。这是因为 .join 将跳过两个 RDD 中没有键的元素。例如,我看到 (4,0) 仅出现在 elem2 中。如果你 join 它们,那么它将被跳过,因为它不会在 elem1 数组中找到。此外,如果您真的想要笛卡尔积,还有方法 .cartesian