Beam runner 如何确定 PCollection 的每个包的大小
How does a Beam runner determine the size of each bundle of a PCollection
根据Apache Beam Execution Model - Bundling and persistence:
"Instead of processing all elements simultaneously, the elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure. For example, a streaming runner may prefer to process and commit small bundles, and a batch runner may prefer to process larger bundles."
该段落表明捆绑包的大小是任意的,由跑步者决定。我查看了 Apache Beam 的源代码,并查看了 Runner 模块以了解运行器如何确定包大小。但是,我想不通。
有人可以指出在哪个 java class(es) 或配置文件中计算捆绑包的大小吗?我对 DirectRunner 和 Cloud Dataflow Runner 的工作原理很感兴趣。
这通常不是要设置的旋钮,事实上它不是数据流运行器 harness/beam sdk 本身的开源代码中的可用旋钮。跑步者在打包捆绑包时做出选择,基于跑步者 preferences/goals 对 运行 高性能管道的选择。
在 Dataflow 中,一些闭源后端系统根据多种因素来确定这一点,包括分片、特定键可用的数据量以及管道的 progress/throughput。捆绑包大小本身不是基于任何类型的静态数字,而是根据 pipeline/workers.
中当前发生的情况动态选择的
通过查看 BigQueryIo,您可以看到他们添加了一个为每条记录生成分片编号的步骤:
.apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(numShards)))
之后,他们应用了 Reshuffle
和 GlobalWindow
,这就像按分片 ID 分组,现在每个捆绑包都是使用相同分片 ID 生成的行数。
放置更大的分片数以获得更大的包。
我用它来读写 BigTable,效果很好。
完整代码:
PCollectionTuple tuple =
tagged
.apply(Reshuffle.of())
// Put in the global window to ensure that DynamicDestinations side inputs are accessed
// correctly.
.apply(
"GlobalWindow",
Window.<KV<ShardedKey<String>, TableRowInfo<ElementT>>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes())
.apply(
"StreamingWrite",
ParDo.of(
new StreamingWriteFn<>(
bigQueryServices,
retryPolicy,
failedInsertsTag,
errorContainer,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
toTableRow))
.withOutputTags(mainOutputTag, TupleTagList.of(failedInsertsTag)));
根据Apache Beam Execution Model - Bundling and persistence:
"Instead of processing all elements simultaneously, the elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure. For example, a streaming runner may prefer to process and commit small bundles, and a batch runner may prefer to process larger bundles."
该段落表明捆绑包的大小是任意的,由跑步者决定。我查看了 Apache Beam 的源代码,并查看了 Runner 模块以了解运行器如何确定包大小。但是,我想不通。
有人可以指出在哪个 java class(es) 或配置文件中计算捆绑包的大小吗?我对 DirectRunner 和 Cloud Dataflow Runner 的工作原理很感兴趣。
这通常不是要设置的旋钮,事实上它不是数据流运行器 harness/beam sdk 本身的开源代码中的可用旋钮。跑步者在打包捆绑包时做出选择,基于跑步者 preferences/goals 对 运行 高性能管道的选择。
在 Dataflow 中,一些闭源后端系统根据多种因素来确定这一点,包括分片、特定键可用的数据量以及管道的 progress/throughput。捆绑包大小本身不是基于任何类型的静态数字,而是根据 pipeline/workers.
中当前发生的情况动态选择的通过查看 BigQueryIo,您可以看到他们添加了一个为每条记录生成分片编号的步骤:
.apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(numShards)))
之后,他们应用了 Reshuffle
和 GlobalWindow
,这就像按分片 ID 分组,现在每个捆绑包都是使用相同分片 ID 生成的行数。
放置更大的分片数以获得更大的包。
我用它来读写 BigTable,效果很好。
完整代码:
PCollectionTuple tuple =
tagged
.apply(Reshuffle.of())
// Put in the global window to ensure that DynamicDestinations side inputs are accessed
// correctly.
.apply(
"GlobalWindow",
Window.<KV<ShardedKey<String>, TableRowInfo<ElementT>>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes())
.apply(
"StreamingWrite",
ParDo.of(
new StreamingWriteFn<>(
bigQueryServices,
retryPolicy,
failedInsertsTag,
errorContainer,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
toTableRow))
.withOutputTags(mainOutputTag, TupleTagList.of(failedInsertsTag)));