将列表转换为 RDD
Convert list to RDD
我正在尝试在 pyspark 中处理 samplecsv.csv 文件 (64 MB)。
此代码生成错误:AttributeError: 'list' object has no attribute 'saveAsTextFile'
我想我已经使用并行化将列表转换为 RDD。如果没有,是怎么做到的?
file = sc.textFile('/user/project/samplecsv.csv',5)
rdd = file.map(lambda line: (line.split(',')[0], line.split(',')[1],
line.split(',')[2], line.split(',')[3],
line.split(',')[4])).collect()
temp = sc.parallelize([rdd], numSlices=50000).collect()
temp.saveAsTextFile("/user/project/newfile.txt")}
您的问题是您在并行列表上调用了 collect,将其返回到正常的 python 列表。
此外,您不应该在每个步骤中都调用 collect,除非您正在为 testing/debugging 进程进行调用。否则你就没有利用 Spark 计算模型。
# loads the file as an rdd
file = sc.textFile('/user/project/samplecsv.csv',5)
# builds a computation graph
rdd = file.map(lambda line: (line.split(',')[0], line.split(',')[1],
line.split(',')[2], line.split(',')[3],
line.split(',')[4]))
# saves the rdd to the filesystem
rdd.saveAsTextFile("/user/project/newfile.txt")
此外,您可以通过仅拆分一次行来使代码更加优化。
我认为你应该试试下面的代码,它会解决你的问题:
file = sc.textFile("C://Users/Ravi/Desktop/test.csv",5)
rdd = file.map(lambda line: (line.split(',')[0], line.split(',')[1],
line.split(',')[2], line.split(',')[3]))
rdd.coalesce(1).saveAsTextFile("C://Users/Ravi/Desktop/temp")
如果你想要分区文件,不要使用合并。
我正在尝试在 pyspark 中处理 samplecsv.csv 文件 (64 MB)。 此代码生成错误:AttributeError: 'list' object has no attribute 'saveAsTextFile'
我想我已经使用并行化将列表转换为 RDD。如果没有,是怎么做到的?
file = sc.textFile('/user/project/samplecsv.csv',5)
rdd = file.map(lambda line: (line.split(',')[0], line.split(',')[1],
line.split(',')[2], line.split(',')[3],
line.split(',')[4])).collect()
temp = sc.parallelize([rdd], numSlices=50000).collect()
temp.saveAsTextFile("/user/project/newfile.txt")}
您的问题是您在并行列表上调用了 collect,将其返回到正常的 python 列表。
此外,您不应该在每个步骤中都调用 collect,除非您正在为 testing/debugging 进程进行调用。否则你就没有利用 Spark 计算模型。
# loads the file as an rdd
file = sc.textFile('/user/project/samplecsv.csv',5)
# builds a computation graph
rdd = file.map(lambda line: (line.split(',')[0], line.split(',')[1],
line.split(',')[2], line.split(',')[3],
line.split(',')[4]))
# saves the rdd to the filesystem
rdd.saveAsTextFile("/user/project/newfile.txt")
此外,您可以通过仅拆分一次行来使代码更加优化。
我认为你应该试试下面的代码,它会解决你的问题:
file = sc.textFile("C://Users/Ravi/Desktop/test.csv",5)
rdd = file.map(lambda line: (line.split(',')[0], line.split(',')[1],
line.split(',')[2], line.split(',')[3]))
rdd.coalesce(1).saveAsTextFile("C://Users/Ravi/Desktop/temp")
如果你想要分区文件,不要使用合并。