python 根据值对元素进行火花排序
python spark sort elements based on value
我是 python spark 的新手,我需要您的帮助,在此先感谢您!
我们开始吧,我有这段脚本:
from datetime import datetime
from pyspark import SparkContext
def getNormalizedDate(dateOfCL):
#the result will be in [0,1]
dot=datetime.now()
od=datetime.strptime("Jan 01 2010", "%b %d %Y")
return (float((dateOfCL-od).days)/float((dot-od).days))
def addition(a, b):
a1=a
b1=b
if not type(a) is float:
a1=getNormalizedDate(a)
if not type(b) is float:
b1=getNormalizedDate(b)
return float(a1+b1)
def debugFunction(x):
print "x[0]: " + str(type(x[0]))
print "x[1]: " + str(type(x[1])) + " --> " + str(x[1])
return x[1]
if __name__ == '__main__':
sc = SparkContext("local", "File Scores")
textFile = sc.textFile("/data/spark/file.csv")
#print "Number of lines: " + str(textFile.count())
test1 = textFile.map(lambda line: line.split(";"))
# result of this:
# [u'01', u'01', u'add', u'fileName', u'Path', u'1', u'info', u'info2', u'info3', u'Sep 24 2014']
test2 = test1.map(lambda line: (line[3], datetime.strptime(line[len(line)-1], "%b %d %Y")))
test6=test2.reduceByKey(addition)
#print test6
test6.persist()
result=sorted(test6.collect(), key=debugFunction)
这以错误结束:
Traceback (most recent call last):
File "/data/spark/script.py", line 40, in <module>
result=sorted(test6.collect(), key=lambda x:x[1])
TypeError: can't compare datetime.datetime to float
有关信息,test6.collect() 给出了此内容
[(u'file1', 0.95606060606060606),
(u'file2', 0.91515151515151516),
(u'file3', 0.8797979797979798),
(u'file4', 0.0),
(u'file5', 0.94696969696969702),
(u'file6', 0.95606060606060606),
(u'file7', 0.98131313131313136),
(u'file8', 0.86161616161616161)]
我想根据浮点值(不是键)对其进行排序
请问应该如何进行?
谢谢大家。
对于那些可能感兴趣的人,我发现了问题所在。
我正在按键减少,然后执行值列表中包含的项目的添加。
一些文件是唯一的,不会受到此次缩减的影响,因此它们仍然会有日期而不是浮点数。
我现在做的是
test2 = test1.map(lambda line: (line[3], line[len(line)-1])).map(getNormalizedDate)
这将生成对 (file, float)
只有这样,我才key reduce
最后,
result=sorted(test6.collect(), key=lamba x:x[1])
为我提供了我一直在寻找的正确排序。
希望对您有所帮助!!
我个人更喜欢使用 [DataFrames][1] 而不是 RDD,
API 更高级。
您可以按特定列对数据框上的数据进行排序,如下所示:
df = spark.read.csv('input_data.csv')
df.sort('column_name').write.csv(path='output_path')
其中 spark
是 pyspark.sql.session.SparkSession
class 的实例。
我是 python spark 的新手,我需要您的帮助,在此先感谢您!
我们开始吧,我有这段脚本:
from datetime import datetime
from pyspark import SparkContext
def getNormalizedDate(dateOfCL):
#the result will be in [0,1]
dot=datetime.now()
od=datetime.strptime("Jan 01 2010", "%b %d %Y")
return (float((dateOfCL-od).days)/float((dot-od).days))
def addition(a, b):
a1=a
b1=b
if not type(a) is float:
a1=getNormalizedDate(a)
if not type(b) is float:
b1=getNormalizedDate(b)
return float(a1+b1)
def debugFunction(x):
print "x[0]: " + str(type(x[0]))
print "x[1]: " + str(type(x[1])) + " --> " + str(x[1])
return x[1]
if __name__ == '__main__':
sc = SparkContext("local", "File Scores")
textFile = sc.textFile("/data/spark/file.csv")
#print "Number of lines: " + str(textFile.count())
test1 = textFile.map(lambda line: line.split(";"))
# result of this:
# [u'01', u'01', u'add', u'fileName', u'Path', u'1', u'info', u'info2', u'info3', u'Sep 24 2014']
test2 = test1.map(lambda line: (line[3], datetime.strptime(line[len(line)-1], "%b %d %Y")))
test6=test2.reduceByKey(addition)
#print test6
test6.persist()
result=sorted(test6.collect(), key=debugFunction)
这以错误结束:
Traceback (most recent call last):
File "/data/spark/script.py", line 40, in <module>
result=sorted(test6.collect(), key=lambda x:x[1])
TypeError: can't compare datetime.datetime to float
有关信息,test6.collect() 给出了此内容
[(u'file1', 0.95606060606060606),
(u'file2', 0.91515151515151516),
(u'file3', 0.8797979797979798),
(u'file4', 0.0),
(u'file5', 0.94696969696969702),
(u'file6', 0.95606060606060606),
(u'file7', 0.98131313131313136),
(u'file8', 0.86161616161616161)]
我想根据浮点值(不是键)对其进行排序 请问应该如何进行?
谢谢大家。
对于那些可能感兴趣的人,我发现了问题所在。 我正在按键减少,然后执行值列表中包含的项目的添加。 一些文件是唯一的,不会受到此次缩减的影响,因此它们仍然会有日期而不是浮点数。
我现在做的是
test2 = test1.map(lambda line: (line[3], line[len(line)-1])).map(getNormalizedDate)
这将生成对 (file, float)
只有这样,我才key reduce
最后,
result=sorted(test6.collect(), key=lamba x:x[1])
为我提供了我一直在寻找的正确排序。
希望对您有所帮助!!
我个人更喜欢使用 [DataFrames][1] 而不是 RDD, API 更高级。 您可以按特定列对数据框上的数据进行排序,如下所示:
df = spark.read.csv('input_data.csv')
df.sort('column_name').write.csv(path='output_path')
其中 spark
是 pyspark.sql.session.SparkSession
class 的实例。