pyspark RDD 中 aggregateByKey 的替代或更好的方法

Alternate or better approach to aggregateByKey in pyspark RDD

我有一个天气数据 csv 文件,其中每个条目都有站点 ID 以及当天记录的最小值或最大值。第二个元素是知道值代表什么的关键字。示例输入如下。

  stationID    feature value
  ITE00100554    TMAX  -75  
  ITE00100554    TMIN -148         
  GM000010962    PRCP    0         
  EZE00100082    TMAX  -86         
  EZE00100082    TMIN -135         
  ITE00100554    TMAX  -60         
  ITE00100554    TMIN -125         
  GM000010962    PRCP    0         
  EZE00100082    TMAX  -44         
  EZE00100082    TMIN -130         
  ITE00100554    TMAX  -23 

我已经过滤掉了带有 TMIN 或 TMAX 的条目。为给定数据记录每个条目。我在构建我的 RDD 时删除了 Date,因为它不感兴趣。我的目标是在其所有记录中找到每个站的最小值和最大值,即

ITE00100554, 'TMIN', <global_min_value recorded by that station>
ITE00100554, 'TMAX', <global_max_value>
EZE00100082, 'TMIN', <global_min_value>
EZE00100082, 'TMAX', <global_max_value>

我能够使用 aggregateByKey 完成此操作,但根据此 link https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/ 我不必使用 aggregateByKey,因为输入和输出值格式相同。所以我想知道是否有替代或更好的方法来编写代码而无需定义这么多函数。

stationtemps = entries.filter(lambda x: x[1] in ['TMIN', 'TMAX']).map(lambda x: (x[0], (x[1], x[2])))  # (stationID, (tempkey, value))

max_temp = stationtemps.values().values().max()
min_temp = stationtemps.values().values().min()


def max_seqOp(accumulator, element):
    return (accumulator if accumulator[1] > element[1] else element)


def max_combOp(accu1, accu2):
    return (accu1 if accu1[1] > accu2[1] else accu2)


def min_seqOp(accumulator, element):
    return (accumulator if accumulator[1] < element[1] else element)


def min_combOp(accu1, accu2):
    return (accu1 if accu1[1] < accu2[1] else accu2)


station_max_temps = stationtemps.aggregateByKey(('', min_temp), max_seqOp, max_combOp).sortByKey()
station_min_temps = stationtemps.aggregateByKey(('', max_temp), min_seqOp, min_combOp).sortByKey()

min_max_temps = station_max_temps.zip(station_min_temps).collect()

with open('1800_min_max.csv', 'w') as fd:
    writer = csv.writer(fd)
    writer.writerows(map(lambda x: list(list(x)), min_max_temps))

我正在学习 pyspark,还没有掌握所有不同的转换函数。

这里是模拟输入,如果min和max填对了,那为什么还需要TMIN,TMAX这个指标呢?确实不需要累加器。

rdd = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3)  ])
rddcollect = rdd.collect()
#print(rddcollect)

rdd2 = rdd.map(lambda x:  (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)

rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1])))  )
rdd4.collect()

returns:

Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]

备选答案

  • 澄清后
  • 假设最小值和最大值有意义
  • 用我自己的数据
  • 顺便说一句还有其他解决方案

这里是:

include = ['tmin','tmax']

rdd0 = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3)  ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x:  ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1]  ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1]  ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)

returns:

[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]

遵循与@thebluephantom 相同的逻辑,这是我从 csv 读取时的最终代码

def get_temp_points(item):
    if item[0][1] == 'TMIN':
        return (item[0], min(item[1]))
    else:
        return (item[0], max(item[1]))


data = lines.filter(lambda x: any(ele for ele in x if ele in ['TMIN', 'TMAX']))

temps = data.map(lambda x: ((x[0], x[2]), float(x[3]))
temp_list = temps.groupByKey().mapValues(list) 
##((stationID, 'TMIN'/'TMAX'), listofvalues)

min_max_temps = temp_list.map(get_temp_points).collect()