控制 Apache Beam/Spark 流管道中的最小文件大小
Controlling the minimum file size in a Apache Beam/Spark Streaming pipeline
我有一条从 Kafka 读取并写入 GCP 的管道。文件的记录太少。我想创建更大的文件。到目前为止,这就是我配置 Beam 的方式(至少我认为是相关参数)。
我的问题是如何控制从 Beam 流媒体管道生成的文件的大小?
windowDuration: 5
numShards: 0
batchIntervalMillis: 30000
checkpointDurationMillis: 30000
maxRecordsPerBatch: 60000000
下面是与流相关的 Spark 配置参数。
spark.default.parallelism=600
spark.ui.retainedStages=10
spark.ui.retainedJobs=10
spark.ui.retainedTasks=12000
spark.streaming.receiver.maxRate=350
spark.streaming.kafka.maxRatePerPartition=350
spark.streaming.ui.retainedBatches=40
spark.streaming.backpressure.enabled=true
spark.streaming.receiver.writeAheadLog.enable=false
spark.streaming.kafka.maxRatePerPartition=0
spark.streaming.stopGracefullyOnShutdown=true
spark.streaming.ui.retainedBatches=50
经过一些研究,我发现当 运行 一个 Beam Streaming 应用程序时,如果从 Kafka 获取高吞吐量数据,最好不要以任何方式合并数据。原因是当 运行 在像 GCP 这样的云环境中时,您需要为 CPU/Memory 付费。如果您尝试合并,您将触发洗牌,这需要您支付更多内存和 CPU 时间。
我发现 GCP 已经提供了一种方法来做到这一点。这篇文章中描述的 "How to concatenate sharded files on Google Cloud Storage automatically using Cloud Functions". The GCP compose 功能就是为此目的而创建的。它将文件合并到位。这意味着您不必为此目的移动数据或分配资源。您只需调用该函数,一切都在 GCP 服务器中进行。
我有一条从 Kafka 读取并写入 GCP 的管道。文件的记录太少。我想创建更大的文件。到目前为止,这就是我配置 Beam 的方式(至少我认为是相关参数)。 我的问题是如何控制从 Beam 流媒体管道生成的文件的大小?
windowDuration: 5
numShards: 0
batchIntervalMillis: 30000
checkpointDurationMillis: 30000
maxRecordsPerBatch: 60000000
下面是与流相关的 Spark 配置参数。
spark.default.parallelism=600
spark.ui.retainedStages=10
spark.ui.retainedJobs=10
spark.ui.retainedTasks=12000
spark.streaming.receiver.maxRate=350
spark.streaming.kafka.maxRatePerPartition=350
spark.streaming.ui.retainedBatches=40
spark.streaming.backpressure.enabled=true
spark.streaming.receiver.writeAheadLog.enable=false
spark.streaming.kafka.maxRatePerPartition=0
spark.streaming.stopGracefullyOnShutdown=true
spark.streaming.ui.retainedBatches=50
经过一些研究,我发现当 运行 一个 Beam Streaming 应用程序时,如果从 Kafka 获取高吞吐量数据,最好不要以任何方式合并数据。原因是当 运行 在像 GCP 这样的云环境中时,您需要为 CPU/Memory 付费。如果您尝试合并,您将触发洗牌,这需要您支付更多内存和 CPU 时间。 我发现 GCP 已经提供了一种方法来做到这一点。这篇文章中描述的 "How to concatenate sharded files on Google Cloud Storage automatically using Cloud Functions". The GCP compose 功能就是为此目的而创建的。它将文件合并到位。这意味着您不必为此目的移动数据或分配资源。您只需调用该函数,一切都在 GCP 服务器中进行。