Kafka Streams - 如何进行全局指标聚合?

Kafka Streams - How to do global metric aggregations?

所以我需要一个 GlobalKTable 包含多个实例中的多个消息的聚合。现在,我的单个实例 KTable 设置看起来像这样:

final KTable<String, Double> aggregatedMetrics = eventStream
        .groupByKey(Serdes.String(), jsonSerde)
        .aggregate(
                () -> 0d,
                new MetricsAggregator(),
                Serdes.Double(),
                LOCAL_METRICS_STORE_NAME);

显然,这不会扩展,因为每个实例只有它收到的消息的更新指标,而不是所有其他实例收到的所有消息的指标。我正在考虑使用这个:

final KStreamBuilder builder = new KStreamBuilder();   
builder.globalTable(METRIC_CHANGES_TOPIC, METRICS_STORE_NAME);

然后只是将我的 aggregatedMetrics KTable 的更新流式传输到 METRIC_CHANGES_TOPIC,这将更新全局 table。但是,每个实例只会在每次更新全局 table 时覆盖其他实例的聚合。

有什么方法可以进行全局聚合吗?

这个解决方案对我来说是正确的。

这听起来不正确:

However, each instance would just be overwriting the other instances' aggregations on each update to the global table.

请注意,聚合是基于键完成的。因此,不同的实例将聚集在不同的键上,因此,每个实例只会在 GlobalKTable.

中更新自己的键