如何在 flink 中实现一个缓冲直到超时并在超时结束时触发的触发器?
How to implement a Trigger in flink that buffers until a timeout and triggers when timeout elapses?
如何在 Flink 中实现一个缓存到超时并在超时时触发的触发器?
如果 window 中至少有一个元素,我希望注册触发器,然后缓冲直到一秒钟,并在一秒钟过去时触发。如果 window 中没有元素,则触发器不会自行注册,因此我不希望看到任何输出。
我不希望触发器每秒产生大量流量,无论 window 中是否存在元素。另一方面说 window 中只有一个元素,我不希望它坐在那里等到水印或永远。相反,我想要一个超时,这样我至少可以在一秒钟后看到一个元素。
ProcessingTimeTrigger.create()
会这样做吗?如果是这样,ProcessingTimeTrigger.create()
与 CountinousProcessingTimeTrigger
之间有什么不同?
正常的一秒长处理时间 window 会给你一个 window 包含一秒钟内发生的所有事件,对于至少有一个的任何一秒事件。但是这个window不会和第一个事件对齐;它将与时钟对齐。因此,例如,如果 window 中的第一个事件发生在给定秒数的一半,那么 window 将仅包括第一个事件之后 500 毫秒的事件。
A ProcessingTimeTrigger
在 window 结束时触发一次。 CountinousProcessingTimeTrigger
以指定的速率重复触发。
要准确获得您所描述的语义,您需要一个自定义触发器。您可以执行与此类似的操作 OneSecondIntervalTrigger example,只是您需要从使用事件时间切换到处理时间,并且只触发一次,而不是重复触发。
这会给你留下这样的东西:
public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
@Override
public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// firstSeen will be false if not set yet
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
// register initial timer only for first element
if (firstSeen.value() == null) {
// FIRE the window 1000 msec after the first event
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + 1000);
fireSeen.update(true);
}
// Continue. Do not evaluate window now
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Continue. We don't use event time timers
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Evaluate the window now
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
// Clear trigger state
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
firstSeen.clear();
}
}
如何在 Flink 中实现一个缓存到超时并在超时时触发的触发器?
如果 window 中至少有一个元素,我希望注册触发器,然后缓冲直到一秒钟,并在一秒钟过去时触发。如果 window 中没有元素,则触发器不会自行注册,因此我不希望看到任何输出。
我不希望触发器每秒产生大量流量,无论 window 中是否存在元素。另一方面说 window 中只有一个元素,我不希望它坐在那里等到水印或永远。相反,我想要一个超时,这样我至少可以在一秒钟后看到一个元素。
ProcessingTimeTrigger.create()
会这样做吗?如果是这样,ProcessingTimeTrigger.create()
与 CountinousProcessingTimeTrigger
之间有什么不同?
正常的一秒长处理时间 window 会给你一个 window 包含一秒钟内发生的所有事件,对于至少有一个的任何一秒事件。但是这个window不会和第一个事件对齐;它将与时钟对齐。因此,例如,如果 window 中的第一个事件发生在给定秒数的一半,那么 window 将仅包括第一个事件之后 500 毫秒的事件。
A ProcessingTimeTrigger
在 window 结束时触发一次。 CountinousProcessingTimeTrigger
以指定的速率重复触发。
要准确获得您所描述的语义,您需要一个自定义触发器。您可以执行与此类似的操作 OneSecondIntervalTrigger example,只是您需要从使用事件时间切换到处理时间,并且只触发一次,而不是重复触发。
这会给你留下这样的东西:
public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
@Override
public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// firstSeen will be false if not set yet
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
// register initial timer only for first element
if (firstSeen.value() == null) {
// FIRE the window 1000 msec after the first event
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + 1000);
fireSeen.update(true);
}
// Continue. Do not evaluate window now
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Continue. We don't use event time timers
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Evaluate the window now
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
// Clear trigger state
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
firstSeen.clear();
}
}