过滤掉pyspark RDD中的非数字值
Filter out non digit values in pyspark RDD
我有一个如下所示的 RDD:
[["3331/587","Metro","1235","1000"],
["1234/232","City","8479","2000"],
["5987/215","Metro","1111","Unkown"],
["8794/215","Metro","1112","1000"],
["1254/951","City","6598","XXXX"],
["1584/951","City","1548","Unkown"],
["1833/331","Metro","1009","2000"],
["2213/987","City","1197", ]]
我想分别计算第二个条目 (City/Metro) 中每个不同值的每行最后一个值(1000、2000 等)的平均值和最大值。我正在使用以下代码收集“城市”值:
rdd.filter(lambda row: row[1] == 'City').map(lambda x: float(x[3])).collect()
但是,我收到错误,可能是因为系列中的字符串值(例如“Unkown”)。
如何过滤掉具有字符串值和空值的行(=只保留那些可转换为数字的行),然后计算最大值和平均值?
试试这个。
rdd = rdd.map(lambda l: [l[i].replace('"', '') for i in range(0, len(l))])
rdd = rdd.filter(lambda l: len(l) > 3) \
.filter(lambda l: l[1] in ['City', 'Metro']) \
.filter(lambda l: l[3].isdigit()) \
.map(lambda l: (l[1], int(l[3]))) \
rdd_avg = rdd.aggregateByKey((0, 0), lambda a, b: (a[0] + b, a[1] + 1), lambda a, b: a + b).mapValues(lambda x: x[0] / x[1])
rdd_max = rdd.reduceByKey(lambda a, b: a if a > b else b)
print(rdd_avg.collect())
print(rdd_max.collect())
[('Metro', 1333.3333333333333), ('City', 2000.0)]
[('Metro', 2000), ('City', 2000)]
我有一个如下所示的 RDD:
[["3331/587","Metro","1235","1000"],
["1234/232","City","8479","2000"],
["5987/215","Metro","1111","Unkown"],
["8794/215","Metro","1112","1000"],
["1254/951","City","6598","XXXX"],
["1584/951","City","1548","Unkown"],
["1833/331","Metro","1009","2000"],
["2213/987","City","1197", ]]
我想分别计算第二个条目 (City/Metro) 中每个不同值的每行最后一个值(1000、2000 等)的平均值和最大值。我正在使用以下代码收集“城市”值:
rdd.filter(lambda row: row[1] == 'City').map(lambda x: float(x[3])).collect()
但是,我收到错误,可能是因为系列中的字符串值(例如“Unkown”)。
如何过滤掉具有字符串值和空值的行(=只保留那些可转换为数字的行),然后计算最大值和平均值?
试试这个。
rdd = rdd.map(lambda l: [l[i].replace('"', '') for i in range(0, len(l))])
rdd = rdd.filter(lambda l: len(l) > 3) \
.filter(lambda l: l[1] in ['City', 'Metro']) \
.filter(lambda l: l[3].isdigit()) \
.map(lambda l: (l[1], int(l[3]))) \
rdd_avg = rdd.aggregateByKey((0, 0), lambda a, b: (a[0] + b, a[1] + 1), lambda a, b: a + b).mapValues(lambda x: x[0] / x[1])
rdd_max = rdd.reduceByKey(lambda a, b: a if a > b else b)
print(rdd_avg.collect())
print(rdd_max.collect())
[('Metro', 1333.3333333333333), ('City', 2000.0)]
[('Metro', 2000), ('City', 2000)]