Apache Spark CombineByKey 与 Python 中的元素列表

Apache Spark CombineByKey with list of elements in Python

我在 Python 中遇到关于 Apache Spark 的问题。我有这套

data = sc.parallelize([('a','u'), ('a', 'v'),
    ('b', 'w'), ('b', 'x'), ('b', 'x')] )

我想做的是按键计算元素的数量并创建一个包含元素的列表。如果我这样做

a = data.combineByKey(lambda value: (value, 1),
                      lambda x, value: (value, x[1] + 1),
                      lambda x, y: (x[0]+'/'+y[0], x[1] + y[1]))

我得到了这个结果:

[('a', ('u/v', 2)), ('b', ('w/x/x', 3))]

我想要的是

[('a', (['u','v'], 2)), ('b', (['w','x','x'], 3))]

我该怎么做?

如果您想将所有值保存为列表,则根本没有理由使用 combineByKey。简单地 groupBy:

效率更高
aggregated = data.groupByKey().mapValues(lambda vs: (list(vs), len(vs)))
aggregated.collect()
## [('a', (['u', 'v'], 2)), ('b', (['w', 'x', 'x'], 3))]

一种更有效的方法是保留计数而不是所有值:

aggregated_counts = (data
    .map(lambda kv: (kv, 1))
    .reduceByKey(add)
    .map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
    .groupByKey()
    .mapValues(lambda xs: (list(xs), sum(x[1] for x in xs))))

aggregated_counts.collect()
## [('a', ([('v', 1), ('u', 1)], 2)), ('b', ([('w', 1), ('x', 2)], 3))]

from collections import Counter

def merge_value(acc, x):
    acc.update(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.update(acc2)
    return acc1

aggregated_counts_ = (data
    .combineByKey(Counter, merge_value, merge_combiners)
    .mapValues(lambda cnt: (cnt, sum(cnt.values()))))

aggregated_counts_.collect()
## [('a', (Counter({'u': 1, 'v': 1}), 2)), ('b', (Counter({'w': 1, 'x': 2}), 3))]