派斯帕克;在列表值上使用 ReduceByKey

Pyspark; Using ReduceByKey on list values

我正在努力更好地理解 reduceByKey 函数,并且一直在探索使用它来完成不同任务的方法。我想应用如下所示的 RDD 数据。一行数据的格式是一个带有名称的元组,然后是与该名称关联的所有日期的列表(下面是数据外观的副本)

data = [("Cassavetes, Frank", ['2012', '2002', '2009', '2005']),
("Knight, Shirley (I)", ['1997', '2002', '2009']),
("Yip, Françoise", ['2007', '2004', '2000']),
("Danner, Blythe", ['2000', '2008', '2012', '2010', '2004', '2004', '1999', '1998']),
("Buck (X)", ['2002', '2006', '2009'])]

为了获取与元组中每个名称关联的所有日期的计数,我应用了下面的代码,使用 reduceByKey 函数尝试将日期列表转换为日期数的总和在列表中。

rdd = spark.sparkContext.parallelize(data)
reducedRdd = rdd.reduceByKey( lambda a,b: len(a.split(" ")) + len(b.split(" ")) )
reducedRdd.take(1)

上面的代码产生与输入数据相同的结果,并且不执行 reduce 函数中列出的任何转换,下面是代码输出的示例:

[('Yip, Françoise', ['2007', '2004', '2000'])]

我期望的输出如下;

[("Yip, Françoise", 3)]

为什么我上面写的代码没有给我预期的输出,我应该如何修改它以确保它可以?

您正在寻找 map,而不是 reduceByKey。没有什么可以减少的,因为你的数据已经按键分组了,所以你的 RDD 什么都不做,你就得到了原来的 RDD。

rdd2 = rdd.map(lambda x: (x[0], len(x[1])))

print(rdd2.collect())
# [('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Yip, Françoise', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]

mapValues 可能更合适:

rdd2 = rdd.mapValues(len)

print(rdd2.collect())
# [('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Yip, Françoise', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]

如果您想使用 reduceByKey,您的数据应该取消分组。例如如果你有

data = [('Cassavetes, Frank', '2012'), ('Cassavetes, Frank', '2002'), ('Cassavetes, Frank', '2009'), ('Cassavetes, Frank', '2005'), ('Knight, Shirley (I)', '1997'), ('Knight, Shirley (I)', '2002'), ('Knight, Shirley (I)', '2009'), ('Yip, Françoise', '2007'), ('Yip, Françoise', '2004'), ('Yip, Françoise', '2000'), ('Danner, Blythe', '2000'), ('Danner, Blythe', '2008'), ('Danner, Blythe', '2012'), ('Danner, Blythe', '2010'), ('Danner, Blythe', '2004'), ('Danner, Blythe', '2004'), ('Danner, Blythe', '1999'), ('Danner, Blythe', '1998'), ('Buck (X)', '2002'), ('Buck (X)', '2006'), ('Buck (X)', '2009')]

那你就可以了

rdd = sc.parallelize(data)

from operator import add
rdd2 = rdd.map(lambda x: (x[0], 1)).reduceByKey(add)

rdd2.collect()
# [('Yip, Françoise', 3), ('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]