onTimer 方法,为什么定时器状态为空?
onTimer method, why timer state is null?
我有一个像这样的简单应用程序(在键控过程函数中)。
正如您在下面的代码部分中所做的那样,我总是首先从状态中获取 timerObject,如果它不存在,我将创建一个新对象,然后更新状态。因此,永远不会有 empty/null 状态。
而且基本上这个状态只是为了保持上次的对象,例如:
- 如果在时间 10:15 看到一个对象,则注册时间将为 10:30。
- 但是,如果在时间 10:25 再次看到对象,则注册时间将更新为 10:40
- 如果进程函数在 10:40 时间运行 onTimer,这意味着在 15 分钟间隔内没有对象,那么我只是清除我的状态。
问题是记录器有时会为状态对象打印 null。应该不是这样吧?
public class ProcessRule extends KeyedProcessFunction<Tuple, LogEntity, Result> {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessRule.class);
private transient ValueState<TimerObject> timerState;
@Override
public void open(Configuration parameters) throws Exception{
ValueStateDescriptor<TimerObject> timerValueStateDescriptor = new ValueStateDescriptor<TimerObject>(
"timerStateForProcessRule",
TypeInformation.of(TimerObject.class)
);
timerState = getRuntimeContext().getState(timerValueStateDescriptor);
}
@Override
public void processElement(LogEntity value, Context ctx, Collector<Result> out) throws Exception{
registerTimer(value, ctx);
if (conditionTrue) {
convert Result add to collector
}
}
private void registerTimer(LogEntity element, Context ctx) throws Exception{
TimerObject stateTimer = timerState.value();
if (stateTimer == null){
stateTimer = new TimerObject();
long timeInterval = 15 * 60 * 1000;
stateTimer.setTimeInterval(timeInterval);
}
stateTimer.setCurrentTimeInMilliseconds(element.getTimestampMs());
timerState.update(stateTimer);
ctx.timerService().registerProcessingTimeTimer(stateTimer.getNextTimer());
// getNextTimer => currentTime + timeInterval
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ValidationResult> out) throws Exception{
TimerObject stateTimer = timerState.value();
LOGGER.info("Timer fired at the the timestamps: {} for: {}", timestamp, stateTimer);
timerState.clear();
}
}
这里的问题很可能是因为您正在注册多个不同的计时器,但您在注册新计时器时似乎没有删除它们。所以,这基本上意味着当 first-timer 触发时 timerState
被清除,但是下一个计时器也可能再次触发秒,因为它可能已经注册为在第一个之后触发 3 秒,在这种情况下 timerState
可能已经是 null
.
我有一个像这样的简单应用程序(在键控过程函数中)。
正如您在下面的代码部分中所做的那样,我总是首先从状态中获取 timerObject,如果它不存在,我将创建一个新对象,然后更新状态。因此,永远不会有 empty/null 状态。
而且基本上这个状态只是为了保持上次的对象,例如:
- 如果在时间 10:15 看到一个对象,则注册时间将为 10:30。
- 但是,如果在时间 10:25 再次看到对象,则注册时间将更新为 10:40
- 如果进程函数在 10:40 时间运行 onTimer,这意味着在 15 分钟间隔内没有对象,那么我只是清除我的状态。
问题是记录器有时会为状态对象打印 null。应该不是这样吧?
public class ProcessRule extends KeyedProcessFunction<Tuple, LogEntity, Result> {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessRule.class);
private transient ValueState<TimerObject> timerState;
@Override
public void open(Configuration parameters) throws Exception{
ValueStateDescriptor<TimerObject> timerValueStateDescriptor = new ValueStateDescriptor<TimerObject>(
"timerStateForProcessRule",
TypeInformation.of(TimerObject.class)
);
timerState = getRuntimeContext().getState(timerValueStateDescriptor);
}
@Override
public void processElement(LogEntity value, Context ctx, Collector<Result> out) throws Exception{
registerTimer(value, ctx);
if (conditionTrue) {
convert Result add to collector
}
}
private void registerTimer(LogEntity element, Context ctx) throws Exception{
TimerObject stateTimer = timerState.value();
if (stateTimer == null){
stateTimer = new TimerObject();
long timeInterval = 15 * 60 * 1000;
stateTimer.setTimeInterval(timeInterval);
}
stateTimer.setCurrentTimeInMilliseconds(element.getTimestampMs());
timerState.update(stateTimer);
ctx.timerService().registerProcessingTimeTimer(stateTimer.getNextTimer());
// getNextTimer => currentTime + timeInterval
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ValidationResult> out) throws Exception{
TimerObject stateTimer = timerState.value();
LOGGER.info("Timer fired at the the timestamps: {} for: {}", timestamp, stateTimer);
timerState.clear();
}
}
这里的问题很可能是因为您正在注册多个不同的计时器,但您在注册新计时器时似乎没有删除它们。所以,这基本上意味着当 first-timer 触发时 timerState
被清除,但是下一个计时器也可能再次触发秒,因为它可能已经注册为在第一个之后触发 3 秒,在这种情况下 timerState
可能已经是 null
.