pyspark 中的 reducebykey 与元组中的多个关键字段

reducebykey in pyspark with multiple key fields in tuple

from pyspark import SparkContext, SparkConf

import sys

 
conf = SparkConf().setAppName("test")

sc = SparkContext(conf=conf)

from operator import add

def convertion(num):

    return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')

def compute(strs, num):

    if strs == 'apple':

        return -num

    return num

rdd = sc.parallelize([

    {'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},

    {'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},

    {'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}

])

rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))

rdd.reduceByKey(lambda x, y: x+y).take(3)

print(rdd.collect())

输出错误:[(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]

我希望输出为: [(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]

我想我没有正确使用reducebykey,谁能告诉我如何根据键元组对它们进行分组?

谢谢!

reduceByKey returns(像所有 Spark 转换一样)一个 new rdd。此新 rdd 未分配给变量,因此未执行转换。

最后一行调用rdd.collect()时,变量rdd仍然引用rdd = rdd.map(...)创建的rdd,并打印map调用后的内容。

应将 reduceByKey 的结果分配给一个变量,并应删除 take(3)

rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))

rdd = rdd.reduceByKey(lambda x, y: x+y)

print(rdd.collect())