'withIngestionTimestamps()' 在 Hazelcast Jet Pipeline 中的具体用途是什么?

What's exactly is the use of 'withIngestionTimestamps()' in Hazelcast Jet Pipeline?

我是 运行 管道,来源来自 Kafka 主题并汇入 IMap。每次我写一个时,我都会遇到方法 withIngestionTimestamps()withoutTimestamps() 并且想知道它们有什么用?我了解所有关于来源为事件增加时间的信息。问题是我如何使用它?我没有看到任何从事件中获取时间戳的方法?

我的 IMap 可能会被重复值填充。如果我可以使用 withIngestionTimestamps() 方法来评估最新记录并丢弃旧记录?

Jet 使用事件时间戳来正确应用 windowing。它必须决定哪个事件属于哪个 window 以及何时关闭 window 并发出其聚合结果。时间戳作为元数据出现在事件上,不会暴露给用户。

但是,如果您想应用引用 wall-clock 时间的逻辑,您始终可以调用 System.currentTimeMillis() 来根据明确存储在 IMap 值中的时间戳检查它。这相当于使用 处理时间,这与 Jet 应用的 摄取时间 非常相似。摄取时间只是在管道的源顶点处有效的处理时间,因此在汇点顶点应用处理时间只是略有不同,并且具有相同的实用属性。

Jet 在幕后管理事件时间戳,它仅对处理器可见。例如,window 聚合将使用时间戳。

如果您想在代码中查看时间戳,则必须将其包含在您的项目类型中。您必须不使用源中的时间戳,使用 map 运算符添加摄取时间戳并让 Jet 知道它:

Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(...))
 .withoutTimestamps()
 .map(t -> tuple2(System.currentTimeMillis(), t))
 .addTimestamps(Tuple2::f0, 2000)
 .drainTo(Sinks.logger());

我使用了 2000 毫秒的 allowedLag。这样做的原因是时间戳将被添加到分配它们的顶点下游的顶点中。流合并可以在那里进行,并且需要考虑内部偏差。例如,它应该考虑最长的预期 GC 暂停或网络延迟。请参阅 addTimestamps 方法中的注释。