Apache Flink:即使给定 window 没有输入记录到达,也会定期发出输出记录
Apache Flink: emit output records periodically even if no input records have arrives for a given window
我只保留状态中保存的最后一个事件,并希望定期发出该状态。
解释了同样的问题 ,但接受的答案仅提示使用计时器。这对我不起作用。
也找到了 ,但这是一个不同的问题;每个 window 中的输入流都会有这么多事件。我的主要问题是我的输入流没有每个 window 的事件,但我想为每个 window.
生成输出
第一个失败的解决方案:
// pseudo code:
keyedStream.window(1 Minute).reduce() // does not emit when no events arrive
另一个失败的解决方案:
//Tried with a process function and a timer,
// but timer could only be created in processElement and fires only once.
// it is not periodic
class MyProcessFunction extends KeyedProcessFunction<Integer, Input, Output>{
public void processElement(Input event, Context context, Collector<Output> out) throws Exception {
savedState.update(event) // save last event in a state
// This creates a timer on every event, which is OK
// and I can easily change the code so that the timer get created only on first event
// But there was no way to make the timer periodic, independent of arrival of events
context.timerService().registerEventTimeTimer(someWindow);
}
public void onTimer(long timestamp, OnTimerContext context, Collector<Output> out) throws Exception {
// this emit exactly once
out.collect(savedState.value());
// was hoping for something like
// context.timerService().registerEventTimeTimer(timestamp + someWindow);
// but there is no access to timerService() here, so I couldn't make the timer periodic
}
}
有什么建议吗?
可以在onTimer
方法中注册定时器; OnTimerContext
确实有一个 TimerService
。您所希望的是它的正常使用方式。
也许如果您提供更多详细信息,我们可以找出它不适合您的原因。
我只保留状态中保存的最后一个事件,并希望定期发出该状态。
解释了同样的问题
也找到了
第一个失败的解决方案:
// pseudo code:
keyedStream.window(1 Minute).reduce() // does not emit when no events arrive
另一个失败的解决方案:
//Tried with a process function and a timer,
// but timer could only be created in processElement and fires only once.
// it is not periodic
class MyProcessFunction extends KeyedProcessFunction<Integer, Input, Output>{
public void processElement(Input event, Context context, Collector<Output> out) throws Exception {
savedState.update(event) // save last event in a state
// This creates a timer on every event, which is OK
// and I can easily change the code so that the timer get created only on first event
// But there was no way to make the timer periodic, independent of arrival of events
context.timerService().registerEventTimeTimer(someWindow);
}
public void onTimer(long timestamp, OnTimerContext context, Collector<Output> out) throws Exception {
// this emit exactly once
out.collect(savedState.value());
// was hoping for something like
// context.timerService().registerEventTimeTimer(timestamp + someWindow);
// but there is no access to timerService() here, so I couldn't make the timer periodic
}
}
有什么建议吗?
可以在onTimer
方法中注册定时器; OnTimerContext
确实有一个 TimerService
。您所希望的是它的正常使用方式。
也许如果您提供更多详细信息,我们可以找出它不适合您的原因。