pyspark: groupby 然后获取每个组的最大值
pyspark: grouby and then get max value of each group
我想按一个值分组,然后使用 PySpark 找到每个组中的最大值。我有以下代码,但现在我对如何提取最大值有点困惑。
# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Here is where I am stuck
group_list = grouped.map(lambda x: (list(x[1]))) #?
Returns 类似于:
[[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]]
我现在想为每个用户查找最大值 'occurrence'。执行 max 后的最终结果将导致如下所示的 RDD:
[[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]]
文件中的每个用户只保留最大数据集。换句话说,我想将 RDD 的 value 更改为仅包含每个用户最大出现次数的单个三元组。
我想我找到了解决方案:
from pyspark import SparkContext, SparkConf
def reduce_by_max(rdd):
"""
Helper function to find the max value in a list of values i.e. triplets.
"""
max_val = rdd[0][2]
the_index = 0
for idx, val in enumerate(rdd):
if val[2] > max_val:
max_val = val[2]
the_index = idx
return rdd[the_index]
conf = SparkConf() \
.setAppName("Collaborative Filter") \
.set("spark.executor.memory", "5g")
sc = SparkContext(conf=conf)
# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I can index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Get the values as a list
group_list = grouped.map(lambda x: (list(x[1])))
# Get the max value for each user.
max_list = group_list.map(reduce_by_max).collect()
这里不需要groupBy
。简单的 reduceByKey
就可以了,大多数时候会更有效率:
data_file = sc.parallelize([
(u'u1', u's1', 20), (u'u1', u's2', 5),
(u'u2', u's3', 5), (u'u2', u's2', 10)])
max_by_group = (data_file
.map(lambda x: (x[0], x)) # Convert to PairwiseRD
# Take maximum of the passed arguments by the last element (key)
# equivalent to:
# lambda x, y: x if x[-1] > y[-1] else y
.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))
.values()) # Drop keys
max_by_group.collect()
## [('u2', 's2', 10), ('u1', 's1', 20)]
我想按一个值分组,然后使用 PySpark 找到每个组中的最大值。我有以下代码,但现在我对如何提取最大值有点困惑。
# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Here is where I am stuck
group_list = grouped.map(lambda x: (list(x[1]))) #?
Returns 类似于:
[[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]]
我现在想为每个用户查找最大值 'occurrence'。执行 max 后的最终结果将导致如下所示的 RDD:
[[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]]
文件中的每个用户只保留最大数据集。换句话说,我想将 RDD 的 value 更改为仅包含每个用户最大出现次数的单个三元组。
我想我找到了解决方案:
from pyspark import SparkContext, SparkConf
def reduce_by_max(rdd):
"""
Helper function to find the max value in a list of values i.e. triplets.
"""
max_val = rdd[0][2]
the_index = 0
for idx, val in enumerate(rdd):
if val[2] > max_val:
max_val = val[2]
the_index = idx
return rdd[the_index]
conf = SparkConf() \
.setAppName("Collaborative Filter") \
.set("spark.executor.memory", "5g")
sc = SparkContext(conf=conf)
# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I can index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Get the values as a list
group_list = grouped.map(lambda x: (list(x[1])))
# Get the max value for each user.
max_list = group_list.map(reduce_by_max).collect()
这里不需要groupBy
。简单的 reduceByKey
就可以了,大多数时候会更有效率:
data_file = sc.parallelize([
(u'u1', u's1', 20), (u'u1', u's2', 5),
(u'u2', u's3', 5), (u'u2', u's2', 10)])
max_by_group = (data_file
.map(lambda x: (x[0], x)) # Convert to PairwiseRD
# Take maximum of the passed arguments by the last element (key)
# equivalent to:
# lambda x, y: x if x[-1] > y[-1] else y
.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))
.values()) # Drop keys
max_by_group.collect()
## [('u2', 's2', 10), ('u1', 's1', 20)]