Pyspark - RDD 提取值以聚合
Pyspark - RDD extract values to aggregate
使用 Pyspark,我正在尝试使用 RDD 以根据该 RDD 的内容进行聚合。
我的 RDD 目前看起来像(显然有更多数据):
[([u'User1', u'2'], 1), ([u'User2', u'2'], 1), ([u'User1', u'3'], 1)]
我想将其汇总为以下格式:
User1 5
User2 2
我正在努力与 RDD 交互,尤其是 RDD 中的列表以获取此数据。我还希望将其保留为 RDD,而不是将其转换为数据框。
任何人都可以告诉我如何做到这一点吗?
您可以 map
将 RDD 转换为 (user, value)
格式,然后 reduceByKey
按用户分组并对值求和。
result = rdd.map(lambda x: (x[0][0], int(x[0][1]))).reduceByKey(lambda x, y: x + y)
result.collect()
# [('User2', 2), ('User1', 5)]
另一种解决方案,与@mck 非常相似,但稍微更具可读性的是使用运算符 add
而不是另一个 lambda 函数:
from operator import add
rdd = sc.parallelize([("user1", "2"), ("user2", "2"), ("user1", "3")])
rdd = rdd.map(lambda x: (x[0], int(x[1])))
rdd = rdd.reduceByKey(add)
"""
>>> rdd.collect()
>>> Out[54]: [('user2', 2), ('user1', 5)]
"""
使用 Pyspark,我正在尝试使用 RDD 以根据该 RDD 的内容进行聚合。
我的 RDD 目前看起来像(显然有更多数据):
[([u'User1', u'2'], 1), ([u'User2', u'2'], 1), ([u'User1', u'3'], 1)]
我想将其汇总为以下格式:
User1 5
User2 2
我正在努力与 RDD 交互,尤其是 RDD 中的列表以获取此数据。我还希望将其保留为 RDD,而不是将其转换为数据框。
任何人都可以告诉我如何做到这一点吗?
您可以 map
将 RDD 转换为 (user, value)
格式,然后 reduceByKey
按用户分组并对值求和。
result = rdd.map(lambda x: (x[0][0], int(x[0][1]))).reduceByKey(lambda x, y: x + y)
result.collect()
# [('User2', 2), ('User1', 5)]
另一种解决方案,与@mck 非常相似,但稍微更具可读性的是使用运算符 add
而不是另一个 lambda 函数:
from operator import add
rdd = sc.parallelize([("user1", "2"), ("user2", "2"), ("user1", "3")])
rdd = rdd.map(lambda x: (x[0], int(x[1])))
rdd = rdd.reduceByKey(add)
"""
>>> rdd.collect()
>>> Out[54]: [('user2', 2), ('user1', 5)]
"""