将 Spark 数据帧写为带分区的 CSV
Write Spark dataframe as CSV with partitions
我正在尝试将 spark 中的数据帧写入 HDFS 位置,我希望如果我添加 partitionBy
符号,Spark 将创建分区
(类似Parquet格式的写法)
形式的文件夹
partition_column_name=partition_value
(即 partition_date=2016-05-03
)。为此,我 运行 以下命令:
(df.write
.partitionBy('partition_date')
.mode('overwrite')
.format("com.databricks.spark.csv")
.save('/tmp/af_organic'))
但尚未创建分区文件夹
知道我该怎么做才能让 spark DF 自动创建这些文件夹吗?
谢谢,
Spark 2.0.0+:
内置的 csv 格式支持开箱即用的分区,因此您应该能够简单地使用:
df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)
不包括任何附加包。
Spark < 2.0.0:
目前 (v1.4.0) spark-csv
不支持 partitionBy
(参见 databricks/spark-csv#123),但您可以调整内置源以实现您想要的效果。
您可以尝试两种不同的方法。假设您的数据相对简单(没有复杂的字符串并且需要字符转义)并且看起来或多或少像这样:
df = sc.parallelize([
("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])
您可以手动准备写入值:
from pyspark.sql.functions import col, concat_ws
key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])
kvs = df.select(key, values)
并使用 text
来源
编写
kvs.write.partitionBy("k").text("/tmp/foo")
df_foo = (sqlContext.read.format("com.databricks.spark.csv")
.options(inferSchema="true")
.load("/tmp/foo/k=foo"))
df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)
在更复杂的情况下,您可以尝试使用适当的 CSV 解析器以类似的方式预处理值,通过使用 UDF 或映射到 RDD,但它的成本会高得多。
如果 CSV 格式不是硬性要求,您还可以使用支持 partitionBy
开箱即用的 JSON writer:
df.write.partitionBy("k").json("/tmp/bar")
以及读取时的分区发现。
我正在尝试将 spark 中的数据帧写入 HDFS 位置,我希望如果我添加 partitionBy
符号,Spark 将创建分区
(类似Parquet格式的写法)
partition_column_name=partition_value
(即 partition_date=2016-05-03
)。为此,我 运行 以下命令:
(df.write
.partitionBy('partition_date')
.mode('overwrite')
.format("com.databricks.spark.csv")
.save('/tmp/af_organic'))
但尚未创建分区文件夹 知道我该怎么做才能让 spark DF 自动创建这些文件夹吗?
谢谢,
Spark 2.0.0+:
内置的 csv 格式支持开箱即用的分区,因此您应该能够简单地使用:
df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)
不包括任何附加包。
Spark < 2.0.0:
目前 (v1.4.0) spark-csv
不支持 partitionBy
(参见 databricks/spark-csv#123),但您可以调整内置源以实现您想要的效果。
您可以尝试两种不同的方法。假设您的数据相对简单(没有复杂的字符串并且需要字符转义)并且看起来或多或少像这样:
df = sc.parallelize([
("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])
您可以手动准备写入值:
from pyspark.sql.functions import col, concat_ws
key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])
kvs = df.select(key, values)
并使用 text
来源
kvs.write.partitionBy("k").text("/tmp/foo")
df_foo = (sqlContext.read.format("com.databricks.spark.csv")
.options(inferSchema="true")
.load("/tmp/foo/k=foo"))
df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)
在更复杂的情况下,您可以尝试使用适当的 CSV 解析器以类似的方式预处理值,通过使用 UDF 或映射到 RDD,但它的成本会高得多。
如果 CSV 格式不是硬性要求,您还可以使用支持 partitionBy
开箱即用的 JSON writer:
df.write.partitionBy("k").json("/tmp/bar")
以及读取时的分区发现。