Kafka Streams - 如何进行全局指标聚合?
Kafka Streams - How to do global metric aggregations?
所以我需要一个 GlobalKTable
包含多个实例中的多个消息的聚合。现在,我的单个实例 KTable
设置看起来像这样:
final KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> 0d,
new MetricsAggregator(),
Serdes.Double(),
LOCAL_METRICS_STORE_NAME);
显然,这不会扩展,因为每个实例只有它收到的消息的更新指标,而不是所有其他实例收到的所有消息的指标。我正在考虑使用这个:
final KStreamBuilder builder = new KStreamBuilder();
builder.globalTable(METRIC_CHANGES_TOPIC, METRICS_STORE_NAME);
然后只是将我的 aggregatedMetrics
KTable 的更新流式传输到 METRIC_CHANGES_TOPIC
,这将更新全局 table。但是,每个实例只会在每次更新全局 table 时覆盖其他实例的聚合。
有什么方法可以进行全局聚合吗?
这个解决方案对我来说是正确的。
这听起来不正确:
However, each instance would just be overwriting the other instances' aggregations on each update to the global table.
请注意,聚合是基于键完成的。因此,不同的实例将聚集在不同的键上,因此,每个实例只会在 GlobalKTable
.
中更新自己的键
所以我需要一个 GlobalKTable
包含多个实例中的多个消息的聚合。现在,我的单个实例 KTable
设置看起来像这样:
final KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> 0d,
new MetricsAggregator(),
Serdes.Double(),
LOCAL_METRICS_STORE_NAME);
显然,这不会扩展,因为每个实例只有它收到的消息的更新指标,而不是所有其他实例收到的所有消息的指标。我正在考虑使用这个:
final KStreamBuilder builder = new KStreamBuilder();
builder.globalTable(METRIC_CHANGES_TOPIC, METRICS_STORE_NAME);
然后只是将我的 aggregatedMetrics
KTable 的更新流式传输到 METRIC_CHANGES_TOPIC
,这将更新全局 table。但是,每个实例只会在每次更新全局 table 时覆盖其他实例的聚合。
有什么方法可以进行全局聚合吗?
这个解决方案对我来说是正确的。
这听起来不正确:
However, each instance would just be overwriting the other instances' aggregations on each update to the global table.
请注意,聚合是基于键完成的。因此,不同的实例将聚集在不同的键上,因此,每个实例只会在 GlobalKTable
.