使用 reduceByKey 函数求和文本长度时出现 Pyspark TypeError

Pyspark TypeError when using reduceByKey function to sum text length

我想知道为什么在尝试使用 reduceByKey 函数计算下面数据中每个给定名称(键)的每个列表中所有字符的总长度时出现类型错误。

data = [("Cassavetes, Frank", 'Orange'),
("Cassavetes, Frank", 'Pineapple'),
("Knight, Shirley (I)", 'Apple'),
("Knight, Shirley (I)", 'Blueberries'),
("Knight, Shirley (I)", 'Orange'),
("Yip, Françoise", 'Grapes'),
("Yip, Françoise", 'Apple'),
("Yip, Françoise", 'Strawberries'),
("Danner, Blythe", 'Pear'),
("Buck (X)", 'Kiwi')]

为了尝试执行此操作,我尝试执行以下代码;

rdd = spark.sparkContext.parallelize(data)
reducedRdd = rdd.reduceByKey( lambda a,b: len(a) + len(b) )
reducedRdd.collect()

上面的代码产生了以下错误:

TypeError: object of type 'int' has no len()

我期望的输出如下;

[('Yip, Françoise', 14), ('Cassavetes, Frank', 15), ('Knight, Shirley (I)', 8), ('Danner, Blythe', 'Pear'), ('Buck (X)', 'Kiwi')]

我注意到下面的代码产生了预期的结果;

reducedRdd = rdd.reduceByKey( lambda a,b: len(str(a)) + len(str(b)) )

虽然我不确定为什么我需要将变量 a 和 b 转换为字符串,如果它们最初是字符串开头,例如我不确定 'Orange' 在 ("Cassavetes, Frank ", 'Orange') 可以认为是一个整数。

PS 我知道我可以使用许多其他函数来实现所需的结果,但我特别想知道为什么我在尝试使用 reduceByKey 函数时遇到问题。

您的代码中的问题是您传递给 reduceByKey 的 reduce 函数不会生成与 RDD 值相同的数据类型。 lambda 函数 returns 和 int 而您的值是 string.

类型

要理解这一点,只需考虑 reduce 的工作原理。该函数应用于前 2 个值,然后将函数的结果添加到第三个值,依此类推...

请注意,即使是适合您的那个实际上也不正确。例如,它 returns ('Danner, Blythe', 'Pear') 而不是 ('Danner, Blythe', 4).

你应该先将值转换成相应的长度,然后按键减少:

reducedRdd = rdd.mapValues(lambda x: len(x)).reduceByKey(lambda a, b: a + b)
print(reducedRdd.collect())
# [('Cassavetes, Frank', 15), ('Danner, Blythe', 4), ('Buck (X)', 4), ('Knight, Shirley (I)', 22), ('Yip, Françoise', 23)]