如何控制从 Spark DataFrame 写入的输出文件的数量?
How can I control the number of output files written from Spark DataFrame?
使用 Spark 流从 Kafka 主题读取 Json 数据。
我使用 DataFrame 来处理数据,稍后我希望将输出保存到 HDFS 文件中。问题是使用:
df.write.save("append").format("text")
生成很多文件,有些很大,有些甚至是 0 字节。
有没有办法控制输出文件的数量?另外,为了避免 "opposite" 问题,有没有办法同时限制每个文件的大小,以便在当前达到某个 size/num 行时写入一个新文件?
输出文件的数量等于 Dataset
的分区数量,这意味着您可以通过多种方式控制它,具体取决于上下文:
- 对于没有广泛依赖性的
Datasets
,您可以使用 reader 特定参数控制输入
- 对于具有广泛依赖性的
Datasets
,您可以使用 spark.sql.shuffle.partitions
参数控制分区数。
- 独立于血统,您可以
coalesce
或 repartition
。
is there a way to also limit the size of each file so a new file will be written to when the current reaches a certain size/num of rows?
没有。对于内置编写器,它是严格的 1:1 关系。
您可以使用尺寸估算器:
import org.apache.spark.util.SizeEstimator
val size = SizeEstimator.estimate(df)
接下来您可以根据数据帧的大小调整文件数量,使用 repatition 或 coalesce
使用 Spark 流从 Kafka 主题读取 Json 数据。
我使用 DataFrame 来处理数据,稍后我希望将输出保存到 HDFS 文件中。问题是使用:
df.write.save("append").format("text")
生成很多文件,有些很大,有些甚至是 0 字节。
有没有办法控制输出文件的数量?另外,为了避免 "opposite" 问题,有没有办法同时限制每个文件的大小,以便在当前达到某个 size/num 行时写入一个新文件?
输出文件的数量等于 Dataset
的分区数量,这意味着您可以通过多种方式控制它,具体取决于上下文:
- 对于没有广泛依赖性的
Datasets
,您可以使用 reader 特定参数控制输入 - 对于具有广泛依赖性的
Datasets
,您可以使用spark.sql.shuffle.partitions
参数控制分区数。 - 独立于血统,您可以
coalesce
或repartition
。
is there a way to also limit the size of each file so a new file will be written to when the current reaches a certain size/num of rows?
没有。对于内置编写器,它是严格的 1:1 关系。
您可以使用尺寸估算器:
import org.apache.spark.util.SizeEstimator
val size = SizeEstimator.estimate(df)
接下来您可以根据数据帧的大小调整文件数量,使用 repatition 或 coalesce