数据流不同操作不缩放

Dataflow Distinct operation not scaling

我有一个带有 "final" 阶段的线性管道,每秒输出大约 200k 个元素(短字符串)。

但是,当我在该阶段 (myPCollection.apply(Distinct.<String>create());) 之后添加一个 Distinct 操作时,Distinct 之前阶段的速度下降到每秒处理大约 80k 个元素。

但是,我正在处理一个没有最大工作人员数量的有界集合,因此我希望 Dataflow 自动增加工作人员数量以匹配工作负载。这不仅没有发生,当我手动启动有很多工作人员(20 多个)的管道时,它会自动缩小到几个工作人员。

如何使 Dataflow 升级工作池,以便此 Distinct 操作不会显着降低管道的处理速度?

看看the implementation of Distinct可能会很有趣。

如您所见,它首先对元素进行分组,然后选取第一个元素。我已提交 a bug 以改进此行为。

在当前的实现中首先对所有元素进行分组,这需要将它们写入持久存储,然后再提取。如果您有任何元素多次出现(即 热键 ),您将在可以写出多少数据方面遇到瓶颈。

作为一个技巧,您可以添加一个 DoFn,在您将元素写出之前删除重复元素。像这样:

class MapperDedupFn extends DoFn<String, String> {
    Set<String> seenElements;
    MapperDedupFn() {
      seenElements = new HashSet<>();
    }

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> receiver) {
      if (seenElements.contains(element)) return;

      seenElements.add(element)
      receiver.output(word);
    }
  }
}

你应该可以在 Distinct 功能之前坚持这个,并希望有更好的表现。