在 Combine 步骤中使用 "aggregator"(自定义计数器)的方法?

A way to use "aggregator" (custom counter) within Combine step?

我的团队为我们用于监控和分析目的的许多数据流管道使用了大量聚合器(自定义计数器)。

我们主要编写 DoFn classes 来这样做,但我们有时会使用 Combine.perKey(),通过编写我们自己的组合 class 来实现 SerializableFunction<Iterable<T>, S> (通常在我们的例子中,TS 是相同的)。我们 运行 的一些工作有一小部分非常热的键,我们正在寻求利用 Combine 提供的一些功能(例如热键扇出),但是有一个问题这种方法。

聚合器似乎仅在 DoFn 内可用,我想知道是否有解决此问题的方法,或者这可能是将来添加的功能。大多数情况下,我们使用一组自定义计数器来计算某些 events/objects 不同类型的数量,以供分析和监控之用。在某些情况下,我们可能可以在 Combine 步骤之后应用另一个 DoFn 来执行此操作,但在其他情况下,我们确实需要在合并过程中对事物进行计数——例如,我们想知道对象在键上的分布以了解如何例如,我们有很多热键,以及什么在热键和非常热键之间划清界限。还有一些其他情况对我们来说似乎很棘手。

我四处搜索,但找不到太多关于如何在 Combine 步骤中使用聚合器的资源,因此非常感谢任何帮助!

如果需要,我或许可以描述一下我们使用了什么样的 Combine 步骤以及我们试图计算的内容,但这需要一些时间,我想对此有一个通用的解决方案.

这目前是不可能的。将来(作为 Apache Beam 的一部分)可能会在 CombineFn 中定义指标(类似于聚合器),这应该可以解决这个问题。

同时,对于您的用例,您可以按照您的描述进行操作。您可以有一个 Combine.perKey(),然后有多个步骤使用结果——一个用于您的实际处理,其他用于报告各种指标。

您还可以查看 CombineFns 中的方法,这些方法允许创建组合 CombineFn。例如,您可以使用 CombineFn 和一个简单的 Count,以便报告 DoFn 可以报告每个键中的元素数量(消耗 Count)和实际处理 DoFn 会消耗你的 CombineFn.

的结果