Spark Streaming 以 Parquet 格式附加到 S3,小分区太多
Spark Streaming appends to S3 as Parquet format, too many small partitions
我正在构建一个应用程序,该应用程序使用 Spark Streaming 从 AWS EMR 上的 Kinesis 流接收数据。目标之一是将数据保存到 S3 (EMRFS) 中,为此我使用了 2 分钟的非重叠 window.
我的方法:
Kinesis Stream -> Spark Streaming,批持续时间约为 60 秒,使用 120 秒的非重叠 window,将流数据保存到 S3 中:
val rdd1 = kinesisStream.map( rdd => /* decode the data */)
rdd1.window(Seconds(120), Seconds(120).foreachRDD { rdd =>
val spark = SparkSession...
import spark.implicits._
// convert rdd to df
val df = rdd.toDF(columnNames: _*)
df.write.parquet("s3://bucket/20161211.parquet")
}
这是 s3://bucket/20161211.parquet 一段时间后的样子:
如您所见,许多零碎的小分区(这对读取性能来说是可怕的)...问题是,当我将数据流式传输到此 S3 parquet 文件中时,是否有任何方法可以控制小分区的数量?
谢谢
我想做的,就是每天做这样的事情:
val df = spark.read.parquet("s3://bucket/20161211.parquet")
df.coalesce(4).write.parquet("s3://bucket/20161211_4parition.parquet")
我将数据帧重新分区为 4 个分区并将它们保存回来....
有效,我觉得每天都这样,不是很优雅的解决方案...
这实际上非常接近您想要执行的操作,每个分区都将在 Spark 中作为单独的文件写出。但是 coalesce
有点令人困惑,因为它可以(有效地)应用到调用合并的上游。来自 Scala 文档的警告是:
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can pass shuffle = true. This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).
在数据集中,persist
和 count
更容易进行广泛评估,因为默认 coalesce
函数不将 repartition
作为输入标志(尽管您可以手动构建 Repartition
的实例)。
另一种选择是使用第二个定期批处理作业(甚至是第二个流作业)来清理 up/merges 结果,但这可能有点复杂,因为它引入了第二个移动部分来跟踪的。
我正在构建一个应用程序,该应用程序使用 Spark Streaming 从 AWS EMR 上的 Kinesis 流接收数据。目标之一是将数据保存到 S3 (EMRFS) 中,为此我使用了 2 分钟的非重叠 window.
我的方法:
Kinesis Stream -> Spark Streaming,批持续时间约为 60 秒,使用 120 秒的非重叠 window,将流数据保存到 S3 中:
val rdd1 = kinesisStream.map( rdd => /* decode the data */)
rdd1.window(Seconds(120), Seconds(120).foreachRDD { rdd =>
val spark = SparkSession...
import spark.implicits._
// convert rdd to df
val df = rdd.toDF(columnNames: _*)
df.write.parquet("s3://bucket/20161211.parquet")
}
这是 s3://bucket/20161211.parquet 一段时间后的样子:
如您所见,许多零碎的小分区(这对读取性能来说是可怕的)...问题是,当我将数据流式传输到此 S3 parquet 文件中时,是否有任何方法可以控制小分区的数量?
谢谢
我想做的,就是每天做这样的事情:
val df = spark.read.parquet("s3://bucket/20161211.parquet")
df.coalesce(4).write.parquet("s3://bucket/20161211_4parition.parquet")
我将数据帧重新分区为 4 个分区并将它们保存回来....
有效,我觉得每天都这样,不是很优雅的解决方案...
这实际上非常接近您想要执行的操作,每个分区都将在 Spark 中作为单独的文件写出。但是 coalesce
有点令人困惑,因为它可以(有效地)应用到调用合并的上游。来自 Scala 文档的警告是:
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
在数据集中,persist
和 count
更容易进行广泛评估,因为默认 coalesce
函数不将 repartition
作为输入标志(尽管您可以手动构建 Repartition
的实例)。
另一种选择是使用第二个定期批处理作业(甚至是第二个流作业)来清理 up/merges 结果,但这可能有点复杂,因为它引入了第二个移动部分来跟踪的。