Kafka Streams - 为每条记录的对象列表提取时间戳
Kafka Streams - Extracting Timestamp for List of Objects per record
我想要实现的是根据消息中存在的时间戳获取记录中存在的每条消息的计数。每条记录由 List<Metric>
个对象组成。我想提取每个指标的时间戳并根据指标名称聚合指标。
公制
public class Metric {
String metric;
Long timestamp;
Double value;
}
自定义时间戳提取器
我已经实现了这个将记录转换为 List 对象的时间戳提取器。它目前获取第一个为这个 ArrayList 执行 windowing 的时间戳。
public class EventTimestampExtractor implements TimestampExtractor {
public long extract(ConsumerRecord<Object, Object> record, long previousTimeStamp) {
try {
// Have a ListSerde in place to deserialize the record to a List<Metric> object.
final List<Metric> value = (List<Metric>) record.value();
final Metric metric = value.get(0); // Returning the first timestamp from the metric list.
return metric.getTimestamp();
}
catch (Exception e) {
// If there is an exception, return back the event time.
return record.timestamp();
}
}
}
拓扑结构
获取列表后,我执行 FlatTransform 来转换此列表并根据扁平化列表执行聚合。
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, List<Metric>> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(),new MetricListSerde()));
TimeWindows windows = TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(2));
stream.filter((key, value) -> value != null)
.flatTransform(() -> new MetricsTransformer()) // Flat transforming the list to single metrics
.groupByKey()
.windowedBy(windows)
.count()
.toStream()
.to("output-topic");
指标列表示例 - 如果您注意到有一个指标和 3 个计数(2 个在 0-10 之间,10 秒后为 1 个)
[{ "metric": "metric1.count",
"timestamp": 1,
"value": 30
},{
"metric": "metric1.count",
"timestamp": 2,
"value": 30
}, {
"metric": "metric1.count",
"timestamp": 15,
"value": 30
}]
我的 window 是 10 秒,我想获取指标的计数。我的 当前结果 看起来像 -
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 3 aggregator: count interval: "10s"}
预期结果 -
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 2 aggregator: count interval: "10s"}
Window{startMs=10, endMs=20} and Value metric: metric1.count value: 1 aggregator: count interval: "10s"}
很抱歉这个问题很长,但是有什么方法可以从包含消息集合的单个记录中提取多个时间戳吗?
Kafka Streams 版本 - 2.4.1
TimestampExtractor
对您的用例没有帮助,因为它只能给您一个时间戳。使用 flatMap()
所有输出记录都继承输入记录的时间戳。
如果你需要动态修改时间戳,你需要使用transform()
来实现"flat map"。对于每个输入记录,您可以多次调用 context.forward()
来进行实际的平面映射(您可以在最后调用 return null;
以不发出任何其他记录)。在每个 forward()
调用中,您可以通过 To.all().withTimestamp(...)
:
设置新的时间戳
public KeyValue transform(K key, V value) {
for (...) {
context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp);
}
return null;
}
我想要实现的是根据消息中存在的时间戳获取记录中存在的每条消息的计数。每条记录由 List<Metric>
个对象组成。我想提取每个指标的时间戳并根据指标名称聚合指标。
公制
public class Metric {
String metric;
Long timestamp;
Double value;
}
自定义时间戳提取器
我已经实现了这个将记录转换为 List 对象的时间戳提取器。它目前获取第一个为这个 ArrayList 执行 windowing 的时间戳。
public class EventTimestampExtractor implements TimestampExtractor {
public long extract(ConsumerRecord<Object, Object> record, long previousTimeStamp) {
try {
// Have a ListSerde in place to deserialize the record to a List<Metric> object.
final List<Metric> value = (List<Metric>) record.value();
final Metric metric = value.get(0); // Returning the first timestamp from the metric list.
return metric.getTimestamp();
}
catch (Exception e) {
// If there is an exception, return back the event time.
return record.timestamp();
}
}
}
拓扑结构
获取列表后,我执行 FlatTransform 来转换此列表并根据扁平化列表执行聚合。
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, List<Metric>> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(),new MetricListSerde()));
TimeWindows windows = TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(2));
stream.filter((key, value) -> value != null)
.flatTransform(() -> new MetricsTransformer()) // Flat transforming the list to single metrics
.groupByKey()
.windowedBy(windows)
.count()
.toStream()
.to("output-topic");
指标列表示例 - 如果您注意到有一个指标和 3 个计数(2 个在 0-10 之间,10 秒后为 1 个)
[{ "metric": "metric1.count",
"timestamp": 1,
"value": 30
},{
"metric": "metric1.count",
"timestamp": 2,
"value": 30
}, {
"metric": "metric1.count",
"timestamp": 15,
"value": 30
}]
我的 window 是 10 秒,我想获取指标的计数。我的 当前结果 看起来像 -
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 3 aggregator: count interval: "10s"}
预期结果 -
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 2 aggregator: count interval: "10s"}
Window{startMs=10, endMs=20} and Value metric: metric1.count value: 1 aggregator: count interval: "10s"}
很抱歉这个问题很长,但是有什么方法可以从包含消息集合的单个记录中提取多个时间戳吗?
Kafka Streams 版本 - 2.4.1
TimestampExtractor
对您的用例没有帮助,因为它只能给您一个时间戳。使用 flatMap()
所有输出记录都继承输入记录的时间戳。
如果你需要动态修改时间戳,你需要使用transform()
来实现"flat map"。对于每个输入记录,您可以多次调用 context.forward()
来进行实际的平面映射(您可以在最后调用 return null;
以不发出任何其他记录)。在每个 forward()
调用中,您可以通过 To.all().withTimestamp(...)
:
public KeyValue transform(K key, V value) {
for (...) {
context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp);
}
return null;
}