Pyspark:写入 csv 写入镶木地板而不是 csv

Pyspark: write to csv writes parquet instead of csv

在下面的代码中,out.csv 是镶木地板格式。我缺少什么选项可以将其写入 csv 文件?

import py4j
from pyspark import SparkConf, SparkContext
from pyspark import HiveContext as hc
import os
from pyspark.sql import SQLContext, Row

from datetime import datetime
from pyspark.sql.types import DateType,StringType
import pyspark.sql.functions as F

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.11:1.5.0'
conf = SparkConf().setMaster("local[64]").setAppName("My App")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

#read parquet file into DF
df = sqlContext.read.parquet('/path/in_parquet')

# Write to csv
df_grouped = df.groupBy('column1').agg(F.sum('column2'))
df_grouped.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("/path/out.csv")

输出:

控制台中剩余的最后几行。另外,这是我用来 运行 脚本的命令:

spark-submit --master local[*] --driver-memory 12g --packages com.databricks:spark-csv_2.11:1.2.0 MyPyspark.py

$ hdfs dfs -ls /path/out.csv
Found 2 items
-rw-r--r--   3 me devs          0 2017-06-29 12:16 /path/out.csv/_SUCCESS
-rw-r--r--   3 me devs        552 2017-06-29 12:16 /path/out.csv/part-00000

Spark 分别保存数据的每个分区,因此,您会为每个分区获得一个文件 part-xxxxx。你指定的路径.save("/path/out.csv")就是文件保存的目录,里面的part-xxxxx个文件已经是csv格式了。

如果您有多个文件和一个小数据集,您可以使用 coalesce(1) 然后保存结果以接收单个 csv 文件。对于较大的数据集,我建议先保存文件,然后使用 FileUtil.copyMerge()(Hadoop 命令)合并文件。