数据流不同操作不缩放
Dataflow Distinct operation not scaling
我有一个带有 "final" 阶段的线性管道,每秒输出大约 200k 个元素(短字符串)。
但是,当我在该阶段 (myPCollection.apply(Distinct.<String>create());
) 之后添加一个 Distinct 操作时,Distinct
之前阶段的速度下降到每秒处理大约 80k 个元素。
但是,我正在处理一个没有最大工作人员数量的有界集合,因此我希望 Dataflow 自动增加工作人员数量以匹配工作负载。这不仅没有发生,当我手动启动有很多工作人员(20 多个)的管道时,它会自动缩小到几个工作人员。
如何使 Dataflow 升级工作池,以便此 Distinct 操作不会显着降低管道的处理速度?
看看the implementation of Distinct
可能会很有趣。
如您所见,它首先对元素进行分组,然后选取第一个元素。我已提交 a bug 以改进此行为。
在当前的实现中首先对所有元素进行分组,这需要将它们写入持久存储,然后再提取。如果您有任何元素多次出现(即 热键 ),您将在可以写出多少数据方面遇到瓶颈。
作为一个技巧,您可以添加一个 DoFn,在您将元素写出之前删除重复元素。像这样:
class MapperDedupFn extends DoFn<String, String> {
Set<String> seenElements;
MapperDedupFn() {
seenElements = new HashSet<>();
}
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
if (seenElements.contains(element)) return;
seenElements.add(element)
receiver.output(word);
}
}
}
你应该可以在 Distinct
功能之前坚持这个,并希望有更好的表现。
我有一个带有 "final" 阶段的线性管道,每秒输出大约 200k 个元素(短字符串)。
但是,当我在该阶段 (myPCollection.apply(Distinct.<String>create());
) 之后添加一个 Distinct 操作时,Distinct
之前阶段的速度下降到每秒处理大约 80k 个元素。
但是,我正在处理一个没有最大工作人员数量的有界集合,因此我希望 Dataflow 自动增加工作人员数量以匹配工作负载。这不仅没有发生,当我手动启动有很多工作人员(20 多个)的管道时,它会自动缩小到几个工作人员。
如何使 Dataflow 升级工作池,以便此 Distinct 操作不会显着降低管道的处理速度?
看看the implementation of Distinct
可能会很有趣。
如您所见,它首先对元素进行分组,然后选取第一个元素。我已提交 a bug 以改进此行为。
在当前的实现中首先对所有元素进行分组,这需要将它们写入持久存储,然后再提取。如果您有任何元素多次出现(即 热键 ),您将在可以写出多少数据方面遇到瓶颈。
作为一个技巧,您可以添加一个 DoFn,在您将元素写出之前删除重复元素。像这样:
class MapperDedupFn extends DoFn<String, String> {
Set<String> seenElements;
MapperDedupFn() {
seenElements = new HashSet<>();
}
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
if (seenElements.contains(element)) return;
seenElements.add(element)
receiver.output(word);
}
}
}
你应该可以在 Distinct
功能之前坚持这个,并希望有更好的表现。