Beam runner 如何确定 PCollection 的每个包的大小

How does a Beam runner determine the size of each bundle of a PCollection

根据Apache Beam Execution Model - Bundling and persistence

"Instead of processing all elements simultaneously, the elements in a PCollection are processed in bundles. The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure. For example, a streaming runner may prefer to process and commit small bundles, and a batch runner may prefer to process larger bundles."

该段落表明捆绑包的大小是任意的,由跑步者决定。我查看了 Apache Beam 的源代码,并查看了 Runner 模块以了解运行器如何确定包大小。但是,我想不通。

有人可以指出在哪个 java class(es) 或配置文件中计算捆绑包的大小吗?我对 DirectRunner 和 Cloud Dataflow Runner 的工作原理很感兴趣。

这通常不是要设置的旋钮,事实上它不是数据流运行器 harness/beam sdk 本身的开源代码中的可用旋钮。跑步者在打包捆绑包时做出选择,基于跑步者 preferences/goals 对 运行 高性能管道的选择。

在 Dataflow 中,一些闭源后端系统根据多种因素来确定这一点,包括分片、特定键可用的数据量以及管道的 progress/throughput。捆绑包大小本身不是基于任何类型的静态数字,而是根据 pipeline/workers.

中当前发生的情况动态选择的

通过查看 BigQueryIo,您可以看到他们添加了一个为每条记录生成分片编号的步骤:

  .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(numShards)))

之后,他们应用了 Reshuffle GlobalWindow ,这就像按分片 ID 分组,现在每个捆绑包都是使用相同分片 ID 生成的行数。

放置更大的分片数以获得更大的包。

我用它来读写 BigTable,效果很好。

完整代码:

  PCollectionTuple tuple =
    tagged
        .apply(Reshuffle.of())
        // Put in the global window to ensure that DynamicDestinations side inputs are accessed
        // correctly.
        .apply(
            "GlobalWindow",
            Window.<KV<ShardedKey<String>, TableRowInfo<ElementT>>>into(new GlobalWindows())
                .triggering(DefaultTrigger.of())
                .discardingFiredPanes())
        .apply(
            "StreamingWrite",
            ParDo.of(
                    new StreamingWriteFn<>(
                        bigQueryServices,
                        retryPolicy,
                        failedInsertsTag,
                        errorContainer,
                        skipInvalidRows,
                        ignoreUnknownValues,
                        ignoreInsertIds,
                        toTableRow))
                .withOutputTags(mainOutputTag, TupleTagList.of(failedInsertsTag)));