带时间戳的 Flink 计数器
Flink counter with timestamp
我正在阅读 Flink 示例 CountWithTimestamp,下面是示例中的代码片段:
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
我的问题是,如果我在 onTimer 中删除 if 语句 timestamp == result.lastModified + 60000
(收集未触及的 stmt),而是在 processElement 的开头用另一个 if 语句 if(ctx.timestamp < current.lastModified + 60000) { deleteEventTimeTimer(current.lastModified + 60000)}
替换它,将会程序的语义相同吗?在语义相同的情况下,一个版本比另一个版本有任何偏好吗?
您认为删除计时器的实现具有相同的语义是正确的。事实上,我最近更改了培训材料中使用的示例来做到这一点,因为我更喜欢这种方法。我觉得它更可取的原因是所有复杂的业务逻辑都在一个地方(在 processElement
中),并且每当调用 onTimer
时,您都知道该做什么,没有问题。此外,它的性能更高,因为检查点和最终触发的计时器更少。
此示例是在删除计时器之前为文档编写的,尚未更新。
您可以找到我在这些幻灯片中提到的修改后的示例 -- https://training.ververica.com/decks/process-function/ -- 一旦您通过了注册页面。
FWIW,我最近还按照相同的思路修改了相应训练练习的参考解决方案:https://github.com/apache/flink-training/tree/master/long-ride-alerts。
我正在阅读 Flink 示例 CountWithTimestamp,下面是示例中的代码片段:
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
我的问题是,如果我在 onTimer 中删除 if 语句 timestamp == result.lastModified + 60000
(收集未触及的 stmt),而是在 processElement 的开头用另一个 if 语句 if(ctx.timestamp < current.lastModified + 60000) { deleteEventTimeTimer(current.lastModified + 60000)}
替换它,将会程序的语义相同吗?在语义相同的情况下,一个版本比另一个版本有任何偏好吗?
您认为删除计时器的实现具有相同的语义是正确的。事实上,我最近更改了培训材料中使用的示例来做到这一点,因为我更喜欢这种方法。我觉得它更可取的原因是所有复杂的业务逻辑都在一个地方(在 processElement
中),并且每当调用 onTimer
时,您都知道该做什么,没有问题。此外,它的性能更高,因为检查点和最终触发的计时器更少。
此示例是在删除计时器之前为文档编写的,尚未更新。
您可以找到我在这些幻灯片中提到的修改后的示例 -- https://training.ververica.com/decks/process-function/ -- 一旦您通过了注册页面。
FWIW,我最近还按照相同的思路修改了相应训练练习的参考解决方案:https://github.com/apache/flink-training/tree/master/long-ride-alerts。