使用 Apache Beam 和 Dataflow 的数据存储性能不佳

Datastore poor performance with Apache Beam & Dataflow

我在 Datastore 写入速度方面遇到了巨大的性能问题。大多数时候它保持在 100 elements/s 以下。

当使用数据存储客户端 (com.google.cloud:google-cloud-datastore ) 和 运行 并行批处理写入。

我已经使用 Java API 设置了一个简单的 Apache Beam 管道。这是它的图表:

运行没有数据存储节点时的速度如下:

这样快多了。这一切都表明 DatastoreV1.Write 是该管道​​中的瓶颈 - 从没有写入节点的管道速度和 DatastoreV1.Write 与其他节点的壁时间相比的壁时间来判断。


我尝试解决此问题的方法:

• 增加初始工人的数量(尝试了 1 和 10,没有明显差异)。 Datastore 在一段时间后(可能在前 2 个节点完成处理之后)将写入次数减少到 1。基于此,我怀疑 DatastoreIO.v1().write() 不会 运行 它的工作人员并行。为什么呢?

• 确保一切都在 运行 相同的位置:GCP 项目、数据流管道工作者和元数据、存储 - 所有都设置为 us-central。这是建议

• 尝试使用不同的实体密钥生成策略(根据 this post)。当前使用此方法:Key.Builder keyBuilder = DatastoreHelper.makeKey("someKind", UUID.randomUUID().toString());。我不确定这会生成足够均匀分布的密钥,但我想即使不是这样,性能也不应该这么低?


请注意,如果没有解决方法,我无法使用提供的 Apache Beam 和 Google 库:我不得不将 google-api-client 版本强制为 1.22 .0 和 Guava 由于它们的依赖性问题而成为 23.0(参见示例 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/607)。

正在查看 DatastoreV1.Write 节点日志:

它每大约 5 秒推送 500 个实体,速度不是很快。

总的来说,DatastoreIO.v1().write() 速度似乎很慢,而且它的工作进程没有 运行 并行。知道如何解决这个问题或可能是什么原因吗?

我不应该不回答这个问题。

在联系 GCP 支持后,我得到了一个建议,认为原因可能是 TextIO.Read 节点从压缩(gzipped)文件中读取。显然这是一个 non-parallelizable 操作。事实上,在为源切换到未压缩文件后,性能得到了提高。

建议的另一个解决方案是 运行 在从源代码读取后手动重新分配管道。这意味着向管道中的项目添加任意键,按任意键分组,然后删除任意键。它也有效。这种方法归结为这段代码:

管道代码:

pipeline.apply(TextIO.read().from("foo").withCompression(Compression.GZIP)  
        .apply(ParDo.of(new PipelineRepartitioner.AddArbitraryKey<>()))
        .apply(GroupByKey.create())
        .apply(ParDo.of(new PipelineRepartitioner.RemoveArbitraryKey<>()))
        /* further transforms */ 

帮手class:

public class PipelineRepartitioner<T> {
    public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
        }
    }

    public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            for (T s : c.element().getValue()) {
                c.output(s);
            }
        }
    }
}

我在 Apache Beam Jira 上看到了与该问题相关的工单,因此这可能会在未来得到解决。