PySpark 中地图的聚合列表

Aggregate List of Map in PySpark

我有一个地图列表,例如

[{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20} } 

我想得到 a 和 b 的平均值。所以预期的输出是

a = (10 + 5 + 0 + 0) /3 = 5 ;
b = 80/4 = 20.

我如何使用 RDD 高效地做到这一点

您可以使用 defaultdict 来收集与 list 相似的键及其值。 然后简单地使用值的总和除以每个值的 list 的元素数进行聚合。

from collections import defaultdict

x = [{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20}]
y = defaultdict(lambda: [])
[y[k].append(v) for i in x for k,v in i.items() ]

for k,v in y.items():
    print k, "=" ,sum(v)/len(v)

>>> y
defaultdict(<function <lambda> at 0x02A43BB0>, {'a': [10, 5, 0], 'b': [20, 20, 20, 20]})
>>> 

>>> 
a = 5
b = 20

最简单的方法可能是 map 您的 rdd 元素格式如下:

init = {'a': {'sum': 0, 'cnt': 0}, 'b': {'sum': 0, 'cnt': 0}}

即记录每个键的总和和计数,然后减少它。

地图函数:

def map_fun(d, keys=['a', 'b']):
    map_d = {}
    for k in keys:
        if k in d:
            temp = {'sum': d[k], 'cnt': 1}
        else:
            temp = {'sum': 0, 'cnt': 0}
        map_d[k] = temp
    return map_d

减少函数:

def reduce_fun(a, b, keys=['a', 'b']):
    from collections import defaultdict
    reduce_d = defaultdict(dict)
    for k in keys:
        reduce_d[k]['sum'] = a[k]['sum'] + b[k]['sum']
        reduce_d[k]['cnt'] = a[k]['cnt'] + b[k]['cnt']
    return reduce_d

rdd.map(map_fun).reduce(reduce_fun)
# defaultdict(<type 'dict'>, {'a': {'sum': 15, 'cnt': 3}, 'b': {'sum': 80, 'cnt': 4}})

计算平均值:

d = rdd.map(map_fun).reduce(reduce_fun)
{k: v['sum']/v['cnt'] for k, v in d.items()}
{'a': 5, 'b': 20}

鉴于您的数据结构,您应该能够使用数据框 api 来实现此计算。如果您需要一个 rdd,那么从数据框返回到一个 rdd 并不难。

from pyspark.sql import functions as F
df = spark.createDataFrame([{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20}])

数据框看起来像这样

+----+---+
|   a|  b|
+----+---+
|  10| 20|
|   5| 20|
|null| 20|
|   0| 20|
+----+---+

然后使用pyspark.sql函数简单计算平均值

cols = df.columns
df_means = df.agg(*[F.mean(F.col(col)).alias(col+"_mean") for col in cols])
df_means.show()

输出:

+------+------+
|a_mean|b_mean|
+------+------+
|   5.0|  20.0|
+------+------+