如何将生成的 RDD 写入 Spark 中的 csv 文件 python
How to write the resulting RDD to a csv file in Spark python
我有一个结果 RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
。这具有以下格式的输出:
[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
我想要创建一个 CSV 文件,其中一列用于 labels
(上面输出中元组的第一部分),一列用于 predictions
(元组输出的第二部分)。但我不知道如何使用 Python.
在 Spark 中写入 CSV 文件
如何使用上述输出创建 CSV 文件?
只是 map
将 RDD (labelsAndPredictions
) 的行转换为字符串(CSV 的行)然后使用 rdd.saveAsTextFile()
.
def toCSVLine(data):
return ','.join(str(d) for d in data)
lines = labelsAndPredictions.map(toCSVLine)
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')
只用逗号连接是不好的,因为如果字段包含逗号,它们将不会被正确引用,例如','.join(['a', 'b', '1,2,3', 'c'])
在您需要 a,b,"1,2,3",c
时给您 a,b,1,2,3,c
。相反,您应该使用 Python 的 csv 模块将 RDD 中的每个列表转换为格式正确的 csv 字符串:
# python 3
import csv, io
def list_to_csv_str(x):
"""Given a list of strings, returns a properly-csv-formatted string."""
output = io.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip() # remove extra newline
# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")
由于 csv 模块只写入文件对象,我们必须用 io.StringIO("")
创建一个空的 "file" 并告诉 csv.writer 将 csv 格式的字符串写入其中。然后,我们使用 output.getvalue()
来获取我们刚刚写入的字符串 "file"。要使此代码与 Python 2 一起使用,只需将 io 替换为 StringIO 模块即可。
如果您使用的是 Spark DataFrames API,您还可以查看 DataBricks save function,它具有 csv 格式。
我知道这是旧的post。但是为了帮助搜索相同内容的人,下面是我在 PySpark 1.6.2
中将两列 RDD 写入单个 CSV 文件的方法
RDD:
>>> rdd.take(5)
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]
现在代码:
# First I convert the RDD to dataframe
from pyspark import SparkContext
df = sqlContext.createDataFrame(rdd, ['count', 'word'])
东风:
>>> df.show()
+-----+-----------+
|count| word|
+-----+-----------+
|73342| cells|
|62861| cell|
|61714| studies|
|61377| aim|
|60168| clinical|
|59275| 2|
|59221| 1|
|58274| data|
|58087|development|
|56579| cancer|
|50243| disease|
|49817| provided|
|49216| specific|
|48857| health|
|48536| study|
|47827| project|
|45573|description|
|45455| applicant|
|44739| program|
|44522| patients|
+-----+-----------+
only showing top 20 rows
现在写入 CSV
# Write CSV (I have HDFS storage)
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')
P.S:我只是初学者,正在学习 Whosebug 中的 posts。所以我不知道这是否是最好的方法。但它对我有用,我希望它能帮助别人!
def toCSV(RDD):
for element in RDD:
return ','.join(str(element))
rows_of_csv=RDD.map(toCSV)
rows_of_csv.saveAsTextFile('/FileStore/tables/name_of_csv_file.csv')
# choose your path based on your distributed file system
我有一个结果 RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
。这具有以下格式的输出:
[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
我想要创建一个 CSV 文件,其中一列用于 labels
(上面输出中元组的第一部分),一列用于 predictions
(元组输出的第二部分)。但我不知道如何使用 Python.
如何使用上述输出创建 CSV 文件?
只是 map
将 RDD (labelsAndPredictions
) 的行转换为字符串(CSV 的行)然后使用 rdd.saveAsTextFile()
.
def toCSVLine(data):
return ','.join(str(d) for d in data)
lines = labelsAndPredictions.map(toCSVLine)
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')
只用逗号连接是不好的,因为如果字段包含逗号,它们将不会被正确引用,例如','.join(['a', 'b', '1,2,3', 'c'])
在您需要 a,b,"1,2,3",c
时给您 a,b,1,2,3,c
。相反,您应该使用 Python 的 csv 模块将 RDD 中的每个列表转换为格式正确的 csv 字符串:
# python 3
import csv, io
def list_to_csv_str(x):
"""Given a list of strings, returns a properly-csv-formatted string."""
output = io.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip() # remove extra newline
# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")
由于 csv 模块只写入文件对象,我们必须用 io.StringIO("")
创建一个空的 "file" 并告诉 csv.writer 将 csv 格式的字符串写入其中。然后,我们使用 output.getvalue()
来获取我们刚刚写入的字符串 "file"。要使此代码与 Python 2 一起使用,只需将 io 替换为 StringIO 模块即可。
如果您使用的是 Spark DataFrames API,您还可以查看 DataBricks save function,它具有 csv 格式。
我知道这是旧的post。但是为了帮助搜索相同内容的人,下面是我在 PySpark 1.6.2
中将两列 RDD 写入单个 CSV 文件的方法RDD:
>>> rdd.take(5)
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]
现在代码:
# First I convert the RDD to dataframe
from pyspark import SparkContext
df = sqlContext.createDataFrame(rdd, ['count', 'word'])
东风:
>>> df.show()
+-----+-----------+
|count| word|
+-----+-----------+
|73342| cells|
|62861| cell|
|61714| studies|
|61377| aim|
|60168| clinical|
|59275| 2|
|59221| 1|
|58274| data|
|58087|development|
|56579| cancer|
|50243| disease|
|49817| provided|
|49216| specific|
|48857| health|
|48536| study|
|47827| project|
|45573|description|
|45455| applicant|
|44739| program|
|44522| patients|
+-----+-----------+
only showing top 20 rows
现在写入 CSV
# Write CSV (I have HDFS storage)
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')
P.S:我只是初学者,正在学习 Whosebug 中的 posts。所以我不知道这是否是最好的方法。但它对我有用,我希望它能帮助别人!
def toCSV(RDD):
for element in RDD:
return ','.join(str(element))
rows_of_csv=RDD.map(toCSV)
rows_of_csv.saveAsTextFile('/FileStore/tables/name_of_csv_file.csv')
# choose your path based on your distributed file system