Apache Spark CombineByKey 与 Python 中的元素列表
Apache Spark CombineByKey with list of elements in Python
我在 Python 中遇到关于 Apache Spark 的问题。我有这套
data = sc.parallelize([('a','u'), ('a', 'v'),
('b', 'w'), ('b', 'x'), ('b', 'x')] )
我想做的是按键计算元素的数量并创建一个包含元素的列表。如果我这样做
a = data.combineByKey(lambda value: (value, 1),
lambda x, value: (value, x[1] + 1),
lambda x, y: (x[0]+'/'+y[0], x[1] + y[1]))
我得到了这个结果:
[('a', ('u/v', 2)), ('b', ('w/x/x', 3))]
我想要的是
[('a', (['u','v'], 2)), ('b', (['w','x','x'], 3))]
我该怎么做?
如果您想将所有值保存为列表,则根本没有理由使用 combineByKey
。简单地 groupBy
:
效率更高
aggregated = data.groupByKey().mapValues(lambda vs: (list(vs), len(vs)))
aggregated.collect()
## [('a', (['u', 'v'], 2)), ('b', (['w', 'x', 'x'], 3))]
一种更有效的方法是保留计数而不是所有值:
aggregated_counts = (data
.map(lambda kv: (kv, 1))
.reduceByKey(add)
.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
.groupByKey()
.mapValues(lambda xs: (list(xs), sum(x[1] for x in xs))))
aggregated_counts.collect()
## [('a', ([('v', 1), ('u', 1)], 2)), ('b', ([('w', 1), ('x', 2)], 3))]
或
from collections import Counter
def merge_value(acc, x):
acc.update(x)
return acc
def merge_combiners(acc1, acc2):
acc1.update(acc2)
return acc1
aggregated_counts_ = (data
.combineByKey(Counter, merge_value, merge_combiners)
.mapValues(lambda cnt: (cnt, sum(cnt.values()))))
aggregated_counts_.collect()
## [('a', (Counter({'u': 1, 'v': 1}), 2)), ('b', (Counter({'w': 1, 'x': 2}), 3))]
我在 Python 中遇到关于 Apache Spark 的问题。我有这套
data = sc.parallelize([('a','u'), ('a', 'v'),
('b', 'w'), ('b', 'x'), ('b', 'x')] )
我想做的是按键计算元素的数量并创建一个包含元素的列表。如果我这样做
a = data.combineByKey(lambda value: (value, 1),
lambda x, value: (value, x[1] + 1),
lambda x, y: (x[0]+'/'+y[0], x[1] + y[1]))
我得到了这个结果:
[('a', ('u/v', 2)), ('b', ('w/x/x', 3))]
我想要的是
[('a', (['u','v'], 2)), ('b', (['w','x','x'], 3))]
我该怎么做?
如果您想将所有值保存为列表,则根本没有理由使用 combineByKey
。简单地 groupBy
:
aggregated = data.groupByKey().mapValues(lambda vs: (list(vs), len(vs)))
aggregated.collect()
## [('a', (['u', 'v'], 2)), ('b', (['w', 'x', 'x'], 3))]
一种更有效的方法是保留计数而不是所有值:
aggregated_counts = (data
.map(lambda kv: (kv, 1))
.reduceByKey(add)
.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
.groupByKey()
.mapValues(lambda xs: (list(xs), sum(x[1] for x in xs))))
aggregated_counts.collect()
## [('a', ([('v', 1), ('u', 1)], 2)), ('b', ([('w', 1), ('x', 2)], 3))]
或
from collections import Counter
def merge_value(acc, x):
acc.update(x)
return acc
def merge_combiners(acc1, acc2):
acc1.update(acc2)
return acc1
aggregated_counts_ = (data
.combineByKey(Counter, merge_value, merge_combiners)
.mapValues(lambda cnt: (cnt, sum(cnt.values()))))
aggregated_counts_.collect()
## [('a', (Counter({'u': 1, 'v': 1}), 2)), ('b', (Counter({'w': 1, 'x': 2}), 3))]