'withIngestionTimestamps()' 在 Hazelcast Jet Pipeline 中的具体用途是什么?
What's exactly is the use of 'withIngestionTimestamps()' in Hazelcast Jet Pipeline?
我是 运行 管道,来源来自 Kafka
主题并汇入 IMap
。每次我写一个时,我都会遇到方法 withIngestionTimestamps()
和 withoutTimestamps()
并且想知道它们有什么用?我了解所有关于来源为事件增加时间的信息。问题是我如何使用它?我没有看到任何从事件中获取时间戳的方法?
我的 IMap 可能会被重复值填充。如果我可以使用 withIngestionTimestamps() 方法来评估最新记录并丢弃旧记录?
Jet 使用事件时间戳来正确应用 windowing。它必须决定哪个事件属于哪个 window 以及何时关闭 window 并发出其聚合结果。时间戳作为元数据出现在事件上,不会暴露给用户。
但是,如果您想应用引用 wall-clock 时间的逻辑,您始终可以调用 System.currentTimeMillis()
来根据明确存储在 IMap 值中的时间戳检查它。这相当于使用 处理时间,这与 Jet 应用的 摄取时间 非常相似。摄取时间只是在管道的源顶点处有效的处理时间,因此在汇点顶点应用处理时间只是略有不同,并且具有相同的实用属性。
Jet 在幕后管理事件时间戳,它仅对处理器可见。例如,window 聚合将使用时间戳。
如果您想在代码中查看时间戳,则必须将其包含在您的项目类型中。您必须不使用源中的时间戳,使用 map
运算符添加摄取时间戳并让 Jet 知道它:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(...))
.withoutTimestamps()
.map(t -> tuple2(System.currentTimeMillis(), t))
.addTimestamps(Tuple2::f0, 2000)
.drainTo(Sinks.logger());
我使用了 2000 毫秒的 allowedLag
。这样做的原因是时间戳将被添加到分配它们的顶点下游的顶点中。流合并可以在那里进行,并且需要考虑内部偏差。例如,它应该考虑最长的预期 GC 暂停或网络延迟。请参阅 addTimestamps
方法中的注释。
我是 运行 管道,来源来自 Kafka
主题并汇入 IMap
。每次我写一个时,我都会遇到方法 withIngestionTimestamps()
和 withoutTimestamps()
并且想知道它们有什么用?我了解所有关于来源为事件增加时间的信息。问题是我如何使用它?我没有看到任何从事件中获取时间戳的方法?
我的 IMap 可能会被重复值填充。如果我可以使用 withIngestionTimestamps() 方法来评估最新记录并丢弃旧记录?
Jet 使用事件时间戳来正确应用 windowing。它必须决定哪个事件属于哪个 window 以及何时关闭 window 并发出其聚合结果。时间戳作为元数据出现在事件上,不会暴露给用户。
但是,如果您想应用引用 wall-clock 时间的逻辑,您始终可以调用 System.currentTimeMillis()
来根据明确存储在 IMap 值中的时间戳检查它。这相当于使用 处理时间,这与 Jet 应用的 摄取时间 非常相似。摄取时间只是在管道的源顶点处有效的处理时间,因此在汇点顶点应用处理时间只是略有不同,并且具有相同的实用属性。
Jet 在幕后管理事件时间戳,它仅对处理器可见。例如,window 聚合将使用时间戳。
如果您想在代码中查看时间戳,则必须将其包含在您的项目类型中。您必须不使用源中的时间戳,使用 map
运算符添加摄取时间戳并让 Jet 知道它:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(...))
.withoutTimestamps()
.map(t -> tuple2(System.currentTimeMillis(), t))
.addTimestamps(Tuple2::f0, 2000)
.drainTo(Sinks.logger());
我使用了 2000 毫秒的 allowedLag
。这样做的原因是时间戳将被添加到分配它们的顶点下游的顶点中。流合并可以在那里进行,并且需要考虑内部偏差。例如,它应该考虑最长的预期 GC 暂停或网络延迟。请参阅 addTimestamps
方法中的注释。