是否有可能从 ktable\kstream 中获得前 10 名?
Is it possible to get top 10 from ktable\kstream?
我有一个主题,其中 String
键是信号类型,Signal
值是 class 像这样
public clas Signal {
public final int deviceId;
public final int value;
...
}
每个设备都可以发送随时间上升或下降的信号值,没有模式。
是否有可能获得每种类型(主题的关键)在所有时间段内最大信号 value
的前 10 台设备作为 KTable<String,Signal>
?如果所有信号值都升高,会有帮助吗?
可以根据需要更改主题结构。
例如,当值总是在增加时,可以使用 Kafka Streams。需要创建自己的 Top10
聚合存储前 10 名并在 add
调用
时更新它
final var builder = new StreamsBuilder();
final var topTable = builder
.table(
SignalChange.TOPIC_NAME,
Consumed.with(Serdes.String(), new SignalChange.Serde())
).toStream()
.groupByKey()
.aggregate(
() -> new Top10(),
(k, v, top10) -> top10.add(v),
Materialized.with(Serdes.String(), new Top10.Serde())
);
topTable
然后可以加入任何请求顶部的流
我有一个主题,其中 String
键是信号类型,Signal
值是 class 像这样
public clas Signal {
public final int deviceId;
public final int value;
...
}
每个设备都可以发送随时间上升或下降的信号值,没有模式。
是否有可能获得每种类型(主题的关键)在所有时间段内最大信号 value
的前 10 台设备作为 KTable<String,Signal>
?如果所有信号值都升高,会有帮助吗?
可以根据需要更改主题结构。
例如,当值总是在增加时,可以使用 Kafka Streams。需要创建自己的 Top10
聚合存储前 10 名并在 add
调用
final var builder = new StreamsBuilder();
final var topTable = builder
.table(
SignalChange.TOPIC_NAME,
Consumed.with(Serdes.String(), new SignalChange.Serde())
).toStream()
.groupByKey()
.aggregate(
() -> new Top10(),
(k, v, top10) -> top10.add(v),
Materialized.with(Serdes.String(), new Top10.Serde())
);
topTable
然后可以加入任何请求顶部的流