pyspark RDD 中 aggregateByKey 的替代或更好的方法
Alternate or better approach to aggregateByKey in pyspark RDD
我有一个天气数据 csv 文件,其中每个条目都有站点 ID 以及当天记录的最小值或最大值。第二个元素是知道值代表什么的关键字。示例输入如下。
stationID feature value
ITE00100554 TMAX -75
ITE00100554 TMIN -148
GM000010962 PRCP 0
EZE00100082 TMAX -86
EZE00100082 TMIN -135
ITE00100554 TMAX -60
ITE00100554 TMIN -125
GM000010962 PRCP 0
EZE00100082 TMAX -44
EZE00100082 TMIN -130
ITE00100554 TMAX -23
我已经过滤掉了带有 TMIN 或 TMAX 的条目。为给定数据记录每个条目。我在构建我的 RDD 时删除了 Date,因为它不感兴趣。我的目标是在其所有记录中找到每个站的最小值和最大值,即
ITE00100554, 'TMIN', <global_min_value recorded by that station>
ITE00100554, 'TMAX', <global_max_value>
EZE00100082, 'TMIN', <global_min_value>
EZE00100082, 'TMAX', <global_max_value>
我能够使用 aggregateByKey 完成此操作,但根据此 link https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/ 我不必使用 aggregateByKey,因为输入和输出值格式相同。所以我想知道是否有替代或更好的方法来编写代码而无需定义这么多函数。
stationtemps = entries.filter(lambda x: x[1] in ['TMIN', 'TMAX']).map(lambda x: (x[0], (x[1], x[2]))) # (stationID, (tempkey, value))
max_temp = stationtemps.values().values().max()
min_temp = stationtemps.values().values().min()
def max_seqOp(accumulator, element):
return (accumulator if accumulator[1] > element[1] else element)
def max_combOp(accu1, accu2):
return (accu1 if accu1[1] > accu2[1] else accu2)
def min_seqOp(accumulator, element):
return (accumulator if accumulator[1] < element[1] else element)
def min_combOp(accu1, accu2):
return (accu1 if accu1[1] < accu2[1] else accu2)
station_max_temps = stationtemps.aggregateByKey(('', min_temp), max_seqOp, max_combOp).sortByKey()
station_min_temps = stationtemps.aggregateByKey(('', max_temp), min_seqOp, min_combOp).sortByKey()
min_max_temps = station_max_temps.zip(station_min_temps).collect()
with open('1800_min_max.csv', 'w') as fd:
writer = csv.writer(fd)
writer.writerows(map(lambda x: list(list(x)), min_max_temps))
我正在学习 pyspark,还没有掌握所有不同的转换函数。
这里是模拟输入,如果min和max填对了,那为什么还需要TMIN,TMAX这个指标呢?确实不需要累加器。
rdd = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3) ])
rddcollect = rdd.collect()
#print(rddcollect)
rdd2 = rdd.map(lambda x: (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)
rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))) )
rdd4.collect()
returns:
Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]
备选答案
- 澄清后
- 假设最小值和最大值有意义
- 用我自己的数据
- 顺便说一句还有其他解决方案
这里是:
include = ['tmin','tmax']
rdd0 = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3) ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x: ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1] ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1] ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)
returns:
[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]
遵循与@thebluephantom 相同的逻辑,这是我从 csv 读取时的最终代码
def get_temp_points(item):
if item[0][1] == 'TMIN':
return (item[0], min(item[1]))
else:
return (item[0], max(item[1]))
data = lines.filter(lambda x: any(ele for ele in x if ele in ['TMIN', 'TMAX']))
temps = data.map(lambda x: ((x[0], x[2]), float(x[3]))
temp_list = temps.groupByKey().mapValues(list)
##((stationID, 'TMIN'/'TMAX'), listofvalues)
min_max_temps = temp_list.map(get_temp_points).collect()
我有一个天气数据 csv 文件,其中每个条目都有站点 ID 以及当天记录的最小值或最大值。第二个元素是知道值代表什么的关键字。示例输入如下。
stationID feature value
ITE00100554 TMAX -75
ITE00100554 TMIN -148
GM000010962 PRCP 0
EZE00100082 TMAX -86
EZE00100082 TMIN -135
ITE00100554 TMAX -60
ITE00100554 TMIN -125
GM000010962 PRCP 0
EZE00100082 TMAX -44
EZE00100082 TMIN -130
ITE00100554 TMAX -23
我已经过滤掉了带有 TMIN 或 TMAX 的条目。为给定数据记录每个条目。我在构建我的 RDD 时删除了 Date,因为它不感兴趣。我的目标是在其所有记录中找到每个站的最小值和最大值,即
ITE00100554, 'TMIN', <global_min_value recorded by that station>
ITE00100554, 'TMAX', <global_max_value>
EZE00100082, 'TMIN', <global_min_value>
EZE00100082, 'TMAX', <global_max_value>
我能够使用 aggregateByKey 完成此操作,但根据此 link https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/ 我不必使用 aggregateByKey,因为输入和输出值格式相同。所以我想知道是否有替代或更好的方法来编写代码而无需定义这么多函数。
stationtemps = entries.filter(lambda x: x[1] in ['TMIN', 'TMAX']).map(lambda x: (x[0], (x[1], x[2]))) # (stationID, (tempkey, value))
max_temp = stationtemps.values().values().max()
min_temp = stationtemps.values().values().min()
def max_seqOp(accumulator, element):
return (accumulator if accumulator[1] > element[1] else element)
def max_combOp(accu1, accu2):
return (accu1 if accu1[1] > accu2[1] else accu2)
def min_seqOp(accumulator, element):
return (accumulator if accumulator[1] < element[1] else element)
def min_combOp(accu1, accu2):
return (accu1 if accu1[1] < accu2[1] else accu2)
station_max_temps = stationtemps.aggregateByKey(('', min_temp), max_seqOp, max_combOp).sortByKey()
station_min_temps = stationtemps.aggregateByKey(('', max_temp), min_seqOp, min_combOp).sortByKey()
min_max_temps = station_max_temps.zip(station_min_temps).collect()
with open('1800_min_max.csv', 'w') as fd:
writer = csv.writer(fd)
writer.writerows(map(lambda x: list(list(x)), min_max_temps))
我正在学习 pyspark,还没有掌握所有不同的转换函数。
这里是模拟输入,如果min和max填对了,那为什么还需要TMIN,TMAX这个指标呢?确实不需要累加器。
rdd = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3) ])
rddcollect = rdd.collect()
#print(rddcollect)
rdd2 = rdd.map(lambda x: (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)
rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))) )
rdd4.collect()
returns:
Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]
备选答案
- 澄清后
- 假设最小值和最大值有意义
- 用我自己的数据
- 顺便说一句还有其他解决方案
这里是:
include = ['tmin','tmax']
rdd0 = sc.parallelize([ ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3) ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x: ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1] ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1] ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)
returns:
[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]
遵循与@thebluephantom 相同的逻辑,这是我从 csv 读取时的最终代码
def get_temp_points(item):
if item[0][1] == 'TMIN':
return (item[0], min(item[1]))
else:
return (item[0], max(item[1]))
data = lines.filter(lambda x: any(ele for ele in x if ele in ['TMIN', 'TMAX']))
temps = data.map(lambda x: ((x[0], x[2]), float(x[3]))
temp_list = temps.groupByKey().mapValues(list)
##((stationID, 'TMIN'/'TMAX'), listofvalues)
min_max_temps = temp_list.map(get_temp_points).collect()