pyspark 中的 reducebykey 与元组中的多个关键字段
reducebykey in pyspark with multiple key fields in tuple
from pyspark import SparkContext, SparkConf
import sys
conf = SparkConf().setAppName("test")
sc = SparkContext(conf=conf)
from operator import add
def convertion(num):
return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')
def compute(strs, num):
if strs == 'apple':
return -num
return num
rdd = sc.parallelize([
{'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},
{'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},
{'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}
])
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd.reduceByKey(lambda x, y: x+y).take(3)
print(rdd.collect())
输出错误:[(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]
我希望输出为:
[(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]
我想我没有正确使用reducebykey,谁能告诉我如何根据键元组对它们进行分组?
谢谢!
reduceByKey returns(像所有 Spark 转换一样)一个 new rdd。此新 rdd 未分配给变量,因此未执行转换。
最后一行调用rdd.collect()
时,变量rdd
仍然引用rdd = rdd.map(...)
创建的rdd,并打印map
调用后的内容。
应将 reduceByKey
的结果分配给一个变量,并应删除 take(3)
:
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd = rdd.reduceByKey(lambda x, y: x+y)
print(rdd.collect())
from pyspark import SparkContext, SparkConf
import sys
conf = SparkConf().setAppName("test")
sc = SparkContext(conf=conf)
from operator import add
def convertion(num):
return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')
def compute(strs, num):
if strs == 'apple':
return -num
return num
rdd = sc.parallelize([
{'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},
{'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},
{'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}
])
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd.reduceByKey(lambda x, y: x+y).take(3)
print(rdd.collect())
输出错误:[(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]
我希望输出为:
[(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]
我想我没有正确使用reducebykey,谁能告诉我如何根据键元组对它们进行分组?
谢谢!
reduceByKey returns(像所有 Spark 转换一样)一个 new rdd。此新 rdd 未分配给变量,因此未执行转换。
最后一行调用rdd.collect()
时,变量rdd
仍然引用rdd = rdd.map(...)
创建的rdd,并打印map
调用后的内容。
应将 reduceByKey
的结果分配给一个变量,并应删除 take(3)
:
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd = rdd.reduceByKey(lambda x, y: x+y)
print(rdd.collect())