用列名编写 csv 并读取从 Pyspark 中的 sparksql 数据框生成的 csv 文件
writing a csv with column names and reading a csv file which is being generated from a sparksql dataframe in Pyspark
我已经启动了 shell with databrick csv package
#../spark-1.6.1-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.3.0
然后我读取了一个 csv 文件,做了一些 groupby 操作并将其转储到 csv。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load(path.csv') ####it has columns and df.columns works fine
type(df) #<class 'pyspark.sql.dataframe.DataFrame'>
#now trying to dump a csv
df.write.format('com.databricks.spark.csv').save('path+my.csv')
#it creates a directory my.csv with 2 partitions
### To create single file i followed below line of code
#df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("path+file_satya.csv") ## this creates one partition in directory of csv name
#but in both cases no columns information(How to add column names to that csv file???)
# again i am trying to read that csv by
df_new = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("the file i just created.csv")
#i am not getting any columns in that..1st row becomes column names
请不要在 read_csv 之后或在阅读时提及列名时向数据框添加模式之类的回答。
问题 1 - 在提供 csv 转储时,有什么方法可以添加列名称吗???
问题2-有没有办法创建可以用ms office或notepad++打开的单个csv文件(又不是目录)???
注意:我目前没有使用集群,因为它对于像我这样的 spark 初学者来说太复杂了。如果有人可以提供 link 如何在集群环境中将 to_csv 处理成单个文件,那将是一个很大的帮助。
得到第一个问题的答案,这是将一个额外的参数 header = 'true' 与 csv 语句一起传递的问题
df.write.format('com.databricks.spark.csv').save('path+my.csv',header = 'true')
#第二题的备选方案
使用topandas.to_csv,但我不想在这里使用pandas,所以请建议是否有其他解决方法。
尝试
df.coalesce(1).write.format('com.databricks.spark.csv').save('path+my.csv',header = 'true')
请注意,这可能不是您当前设置的问题,但在非常大的数据集上,您可以 运行 进入驱动程序的内存问题。这也将花费更长的时间(在集群场景中),因为所有内容都必须推回到一个位置。
使用 spark >= 2.o,我们可以做类似的事情
df = spark.read.csv('path+filename.csv', sep = 'ifany',header='true')
df.write.csv('path_filename of csv',header=True) ###yes still in partitions
df.toPandas().to_csv('path_filename of csv',index=False) ###single csv(Pandas Style)
以防万一,
在 spark 2.1 上,您可以使用以下行创建单个 csv 文件
dataframe.coalesce(1) //So just a single part- file will be created
.write.mode(SaveMode.Overwrite)
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false") //Avoid creating of crc files
.option("header","true") //Write the header
.csv("csvFullPath")
以下应该可以解决问题:
df \
.write \
.mode('overwrite') \
.option('header', 'true') \
.csv('output.csv')
或者,如果您希望结果位于单个分区中,您可以使用 coalesce(1)
:
df \
.coalesce(1) \
.write \
.mode('overwrite') \
.option('header', 'true') \
.csv('output.csv')
但是请注意,这是一项昂贵的操作,对于非常大的数据集可能不可行。
我已经启动了 shell with databrick csv package
#../spark-1.6.1-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.3.0
然后我读取了一个 csv 文件,做了一些 groupby 操作并将其转储到 csv。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load(path.csv') ####it has columns and df.columns works fine
type(df) #<class 'pyspark.sql.dataframe.DataFrame'>
#now trying to dump a csv
df.write.format('com.databricks.spark.csv').save('path+my.csv')
#it creates a directory my.csv with 2 partitions
### To create single file i followed below line of code
#df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("path+file_satya.csv") ## this creates one partition in directory of csv name
#but in both cases no columns information(How to add column names to that csv file???)
# again i am trying to read that csv by
df_new = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("the file i just created.csv")
#i am not getting any columns in that..1st row becomes column names
请不要在 read_csv 之后或在阅读时提及列名时向数据框添加模式之类的回答。
问题 1 - 在提供 csv 转储时,有什么方法可以添加列名称吗???
问题2-有没有办法创建可以用ms office或notepad++打开的单个csv文件(又不是目录)???
注意:我目前没有使用集群,因为它对于像我这样的 spark 初学者来说太复杂了。如果有人可以提供 link 如何在集群环境中将 to_csv 处理成单个文件,那将是一个很大的帮助。
得到第一个问题的答案,这是将一个额外的参数 header = 'true' 与 csv 语句一起传递的问题
df.write.format('com.databricks.spark.csv').save('path+my.csv',header = 'true')
#第二题的备选方案
使用topandas.to_csv,但我不想在这里使用pandas,所以请建议是否有其他解决方法。
尝试
df.coalesce(1).write.format('com.databricks.spark.csv').save('path+my.csv',header = 'true')
请注意,这可能不是您当前设置的问题,但在非常大的数据集上,您可以 运行 进入驱动程序的内存问题。这也将花费更长的时间(在集群场景中),因为所有内容都必须推回到一个位置。
使用 spark >= 2.o,我们可以做类似的事情
df = spark.read.csv('path+filename.csv', sep = 'ifany',header='true')
df.write.csv('path_filename of csv',header=True) ###yes still in partitions
df.toPandas().to_csv('path_filename of csv',index=False) ###single csv(Pandas Style)
以防万一, 在 spark 2.1 上,您可以使用以下行创建单个 csv 文件
dataframe.coalesce(1) //So just a single part- file will be created
.write.mode(SaveMode.Overwrite)
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false") //Avoid creating of crc files
.option("header","true") //Write the header
.csv("csvFullPath")
以下应该可以解决问题:
df \
.write \
.mode('overwrite') \
.option('header', 'true') \
.csv('output.csv')
或者,如果您希望结果位于单个分区中,您可以使用 coalesce(1)
:
df \
.coalesce(1) \
.write \
.mode('overwrite') \
.option('header', 'true') \
.csv('output.csv')
但是请注意,这是一项昂贵的操作,对于非常大的数据集可能不可行。