Spark Python - 如何使用 reduce by key 获取 minmum/maximum 值
Spark Python - how to use reduce by key to get minmum/maximum values
我有一个csv格式的一些城市最高和最低气温的样本数据。
Mumbai,19,30
Delhi,5,41
Kolkata,20,40
Mumbai,18,35
Delhi,4,42
Delhi,10,44
Kolkata,19,39
我想使用 Python 中的 spark 脚本找出每个城市记录的所有时间最低温度。
这是我的脚本
cityTemp = sc.textFile("weather.txt").map(lambda x: x.split(','))
# convert it to pair RDD for performing reduce by Key
cityTemp = cityTemp.map(lambda x: (x[0], tuple(x[1:])))
cityTempMin = cityTemp.reduceByKey(lambda x, y: min(x[0],y[0]))
cityTempMin.collect()
我的预期输出如下
Delhi, 4
Mumbai, 18
Kolkata, 19
但是脚本产生了以下输出。
[(u'Kolkata', u'19'), (u'Mumbai', u'18'), (u'Delhi', u'1')]
如何获得所需的输出?
如果必须使用 reduceByKey 函数,请尝试以下解决方案:
斯卡拉:
val df = sc.parallelize(Seq(("Mumbai", 19, 30),
("Delhi", 5, 41),
("Kolkata", 20, 40),
("Mumbai", 18, 35),
("Delhi", 4, 42),
("Delhi", 10, 44),
("Kolkata", 19, 39))).map(x => (x._1,x._2)).keyBy(_._1)
df.reduceByKey((accum, n) => if (accum._2 > n._2) n else accum).map(_._2).collect().foreach(println)
PYTHON:
rdd = sc.parallelize([("Mumbai", 19, 30),
("Delhi", 5, 41),
("Kolkata", 20, 40),
("Mumbai", 18, 35),
("Delhi", 4, 42),
("Delhi", 10, 44),
("Kolkata", 19, 39)])
def reduceFunc(accum, n):
print(accum, n)
if accum[1] > n[1]:
return(n)
else: return(accum)
def mapFunc(lines):
return (lines[0], lines[1])
rdd.map(mapFunc).keyBy(lambda x: x[0]).reduceByKey(reduceFunc).map(lambda x : x[1]).collect()
输出:
(Kolkata,19)
(Delhi,4)
(Mumbai,18)
如果你不想做一个reduceByKey。只需一组 by 后跟 min 函数即可为您提供所需的结果。
val df = sc.parallelize(Seq(("Mumbai", 19, 30),
("Delhi", 5, 41),
("Kolkata", 20, 40),
("Mumbai", 18, 35),
("Delhi", 4, 42),
("Delhi", 10, 44),
("Kolkata", 19, 39))).toDF("city", "minTemp", "maxTemp")
df.groupBy("city").agg(min("minTemp")).show
输出:
+-------+------------+
| city|min(minTemp)|
+-------+------------+
| Mumbai| 18|
|Kolkata| 19|
| Delhi| 4|
+-------+------------+
我有一个csv格式的一些城市最高和最低气温的样本数据。
Mumbai,19,30
Delhi,5,41
Kolkata,20,40
Mumbai,18,35
Delhi,4,42
Delhi,10,44
Kolkata,19,39
我想使用 Python 中的 spark 脚本找出每个城市记录的所有时间最低温度。
这是我的脚本
cityTemp = sc.textFile("weather.txt").map(lambda x: x.split(','))
# convert it to pair RDD for performing reduce by Key
cityTemp = cityTemp.map(lambda x: (x[0], tuple(x[1:])))
cityTempMin = cityTemp.reduceByKey(lambda x, y: min(x[0],y[0]))
cityTempMin.collect()
我的预期输出如下
Delhi, 4
Mumbai, 18
Kolkata, 19
但是脚本产生了以下输出。
[(u'Kolkata', u'19'), (u'Mumbai', u'18'), (u'Delhi', u'1')]
如何获得所需的输出?
如果必须使用 reduceByKey 函数,请尝试以下解决方案:
斯卡拉:
val df = sc.parallelize(Seq(("Mumbai", 19, 30),
("Delhi", 5, 41),
("Kolkata", 20, 40),
("Mumbai", 18, 35),
("Delhi", 4, 42),
("Delhi", 10, 44),
("Kolkata", 19, 39))).map(x => (x._1,x._2)).keyBy(_._1)
df.reduceByKey((accum, n) => if (accum._2 > n._2) n else accum).map(_._2).collect().foreach(println)
PYTHON:
rdd = sc.parallelize([("Mumbai", 19, 30),
("Delhi", 5, 41),
("Kolkata", 20, 40),
("Mumbai", 18, 35),
("Delhi", 4, 42),
("Delhi", 10, 44),
("Kolkata", 19, 39)])
def reduceFunc(accum, n):
print(accum, n)
if accum[1] > n[1]:
return(n)
else: return(accum)
def mapFunc(lines):
return (lines[0], lines[1])
rdd.map(mapFunc).keyBy(lambda x: x[0]).reduceByKey(reduceFunc).map(lambda x : x[1]).collect()
输出:
(Kolkata,19)
(Delhi,4)
(Mumbai,18)
如果你不想做一个reduceByKey。只需一组 by 后跟 min 函数即可为您提供所需的结果。
val df = sc.parallelize(Seq(("Mumbai", 19, 30),
("Delhi", 5, 41),
("Kolkata", 20, 40),
("Mumbai", 18, 35),
("Delhi", 4, 42),
("Delhi", 10, 44),
("Kolkata", 19, 39))).toDF("city", "minTemp", "maxTemp")
df.groupBy("city").agg(min("minTemp")).show
输出:
+-------+------------+
| city|min(minTemp)|
+-------+------------+
| Mumbai| 18|
|Kolkata| 19|
| Delhi| 4|
+-------+------------+