PySpark 中地图的聚合列表
Aggregate List of Map in PySpark
我有一个地图列表,例如
[{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20} }
我想得到 a 和 b 的平均值。所以预期的输出是
a = (10 + 5 + 0 + 0) /3 = 5 ;
b = 80/4 = 20.
我如何使用 RDD 高效地做到这一点
您可以使用 defaultdict
来收集与 list
相似的键及其值。
然后简单地使用值的总和除以每个值的 list
的元素数进行聚合。
from collections import defaultdict
x = [{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20}]
y = defaultdict(lambda: [])
[y[k].append(v) for i in x for k,v in i.items() ]
for k,v in y.items():
print k, "=" ,sum(v)/len(v)
>>> y
defaultdict(<function <lambda> at 0x02A43BB0>, {'a': [10, 5, 0], 'b': [20, 20, 20, 20]})
>>>
>>>
a = 5
b = 20
最简单的方法可能是 map
您的 rdd 元素格式如下:
init = {'a': {'sum': 0, 'cnt': 0}, 'b': {'sum': 0, 'cnt': 0}}
即记录每个键的总和和计数,然后减少它。
地图函数:
def map_fun(d, keys=['a', 'b']):
map_d = {}
for k in keys:
if k in d:
temp = {'sum': d[k], 'cnt': 1}
else:
temp = {'sum': 0, 'cnt': 0}
map_d[k] = temp
return map_d
减少函数:
def reduce_fun(a, b, keys=['a', 'b']):
from collections import defaultdict
reduce_d = defaultdict(dict)
for k in keys:
reduce_d[k]['sum'] = a[k]['sum'] + b[k]['sum']
reduce_d[k]['cnt'] = a[k]['cnt'] + b[k]['cnt']
return reduce_d
rdd.map(map_fun).reduce(reduce_fun)
# defaultdict(<type 'dict'>, {'a': {'sum': 15, 'cnt': 3}, 'b': {'sum': 80, 'cnt': 4}})
计算平均值:
d = rdd.map(map_fun).reduce(reduce_fun)
{k: v['sum']/v['cnt'] for k, v in d.items()}
{'a': 5, 'b': 20}
鉴于您的数据结构,您应该能够使用数据框 api 来实现此计算。如果您需要一个 rdd,那么从数据框返回到一个 rdd 并不难。
from pyspark.sql import functions as F
df = spark.createDataFrame([{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20}])
数据框看起来像这样
+----+---+
| a| b|
+----+---+
| 10| 20|
| 5| 20|
|null| 20|
| 0| 20|
+----+---+
然后使用pyspark.sql函数简单计算平均值
cols = df.columns
df_means = df.agg(*[F.mean(F.col(col)).alias(col+"_mean") for col in cols])
df_means.show()
输出:
+------+------+
|a_mean|b_mean|
+------+------+
| 5.0| 20.0|
+------+------+
我有一个地图列表,例如
[{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20} }
我想得到 a 和 b 的平均值。所以预期的输出是
a = (10 + 5 + 0 + 0) /3 = 5 ;
b = 80/4 = 20.
我如何使用 RDD 高效地做到这一点
您可以使用 defaultdict
来收集与 list
相似的键及其值。
然后简单地使用值的总和除以每个值的 list
的元素数进行聚合。
from collections import defaultdict
x = [{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20}]
y = defaultdict(lambda: [])
[y[k].append(v) for i in x for k,v in i.items() ]
for k,v in y.items():
print k, "=" ,sum(v)/len(v)
>>> y
defaultdict(<function <lambda> at 0x02A43BB0>, {'a': [10, 5, 0], 'b': [20, 20, 20, 20]})
>>>
>>>
a = 5
b = 20
最简单的方法可能是 map
您的 rdd 元素格式如下:
init = {'a': {'sum': 0, 'cnt': 0}, 'b': {'sum': 0, 'cnt': 0}}
即记录每个键的总和和计数,然后减少它。
地图函数:
def map_fun(d, keys=['a', 'b']):
map_d = {}
for k in keys:
if k in d:
temp = {'sum': d[k], 'cnt': 1}
else:
temp = {'sum': 0, 'cnt': 0}
map_d[k] = temp
return map_d
减少函数:
def reduce_fun(a, b, keys=['a', 'b']):
from collections import defaultdict
reduce_d = defaultdict(dict)
for k in keys:
reduce_d[k]['sum'] = a[k]['sum'] + b[k]['sum']
reduce_d[k]['cnt'] = a[k]['cnt'] + b[k]['cnt']
return reduce_d
rdd.map(map_fun).reduce(reduce_fun)
# defaultdict(<type 'dict'>, {'a': {'sum': 15, 'cnt': 3}, 'b': {'sum': 80, 'cnt': 4}})
计算平均值:
d = rdd.map(map_fun).reduce(reduce_fun)
{k: v['sum']/v['cnt'] for k, v in d.items()}
{'a': 5, 'b': 20}
鉴于您的数据结构,您应该能够使用数据框 api 来实现此计算。如果您需要一个 rdd,那么从数据框返回到一个 rdd 并不难。
from pyspark.sql import functions as F
df = spark.createDataFrame([{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20}])
数据框看起来像这样
+----+---+
| a| b|
+----+---+
| 10| 20|
| 5| 20|
|null| 20|
| 0| 20|
+----+---+
然后使用pyspark.sql函数简单计算平均值
cols = df.columns
df_means = df.agg(*[F.mean(F.col(col)).alias(col+"_mean") for col in cols])
df_means.show()
输出:
+------+------+
|a_mean|b_mean|
+------+------+
| 5.0| 20.0|
+------+------+