pyspark dataframe 到 HDFS 保存太多文件
pyspark dataframe to HDFS saving too many files
我正在聚合数据并希望将结果保存在 HDFS 中。我的最终结果只有 6 行和 2 列的数据。但是,当我将它保存到 HDFS 时,它保存了 200 多个文件;我假设这是某种类型的预处理。当我查看文件时,它们也都是空白的。
results = aggregate.filter(aggregate["count"] > 2500)
results.show()
+--------------+-----+
| c_ip|count|
+--------------+-----+
| 198.51.100.61| 2619|
| 203.0.113.33| 2635|
|198.51.100.211| 2668|
|198.51.100.121| 2723|
|198.51.100.176| 2518|
| 198.51.100.16| 2546|
+--------------+-----+
results.write.format("csv").save("/sparkcourse/results")
如何将这些结果保存到 HDFS 以便获得一个文件?显然这个数据适合一个文件。
我尝试的另一件事是使用 .collect() 但随后它将我的数据变成了一个列表并且无法使用该选项将任何内容放入 HDFS。
results = aggregate.filter(aggregate["count"] > 2500).collect()
Spark 只要在处理过程中出现 shuffle,就会将数据重新划分为 200 个分区。 只要需要将数据从一个节点传输到另一个节点或在执行器之间传输数据,就会发生混洗。因此,当您保存数据框(已经有 200 个分区)时,会为每个分区创建 200 个文件,并写入一些元数据文件。
所以您的问题的解决方案是使用 coalesce(1)
函数,以便一个工作节点写入
中提到的输出路径
results.coalesce(1).write.format("csv").save("/sparkcourse/results")
或者您可以将 repartition(1)
用作
results.repartition(1).write.format("csv").save("/sparkcourse/results")
我正在聚合数据并希望将结果保存在 HDFS 中。我的最终结果只有 6 行和 2 列的数据。但是,当我将它保存到 HDFS 时,它保存了 200 多个文件;我假设这是某种类型的预处理。当我查看文件时,它们也都是空白的。
results = aggregate.filter(aggregate["count"] > 2500)
results.show()
+--------------+-----+
| c_ip|count|
+--------------+-----+
| 198.51.100.61| 2619|
| 203.0.113.33| 2635|
|198.51.100.211| 2668|
|198.51.100.121| 2723|
|198.51.100.176| 2518|
| 198.51.100.16| 2546|
+--------------+-----+
results.write.format("csv").save("/sparkcourse/results")
如何将这些结果保存到 HDFS 以便获得一个文件?显然这个数据适合一个文件。
我尝试的另一件事是使用 .collect() 但随后它将我的数据变成了一个列表并且无法使用该选项将任何内容放入 HDFS。
results = aggregate.filter(aggregate["count"] > 2500).collect()
Spark 只要在处理过程中出现 shuffle,就会将数据重新划分为 200 个分区。 只要需要将数据从一个节点传输到另一个节点或在执行器之间传输数据,就会发生混洗。因此,当您保存数据框(已经有 200 个分区)时,会为每个分区创建 200 个文件,并写入一些元数据文件。
所以您的问题的解决方案是使用 coalesce(1)
函数,以便一个工作节点写入
results.coalesce(1).write.format("csv").save("/sparkcourse/results")
或者您可以将 repartition(1)
用作
results.repartition(1).write.format("csv").save("/sparkcourse/results")