使用 BroadcastState 模式时如何触发下游 onEventTime() 方法?
How do I fire downstream onEventTime() method when using BroadcastState pattern?
我正在使用如下管道:
inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
processBroadcastElement(...){...}
processElement(...){...}
}).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())
在 CustomTrigger()
中,我正在注册一个 eventTimeTimer()
,它将触发以指示我的 window 结束。问题是 onEventTime()
方法 从未 被调用,即使在:
- 我已经确保
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- 使用
ascendingTimestampExtractor()
,我发送了一个事件,该事件肯定已将水印推得足够远,以至于 eventTimeTimer()
应该触发。
我错过了什么?是不是跟水印丢失和KeyedBroadcastProcessFunction
的onTimer()
方法有关?我怀疑是因为 David Anderson 在 中的评论
答案:
add special fake watermarks for the non-broadcast stream (set to
Watermark.MAX_WATERMARK)
我还没有实现名为 onTimer 的方法。但是,如果确实如此,我不明白这与下游触发器有什么关系。谢谢。
编辑:此场景的完整示例是 here。
是的,问题是广播流没有水印。 (但是不,KeyedBroadcastProcessFunction
是否有 onTimer 方法并不重要。一旦你让水印流动,它们就会流到 window 无论如何。)
只要一个运算符有两个或多个输入——所以在你的情况下,当 inputStream
和 configurationBroadcastStream
连接时——该运算符的水印将是其水印中的最小值输入。由于广播流没有水印,这阻止了 inputStream
.
提供的水印
我有一个 example 显示您可能如何处理此问题。假设您的广播流不需要任何时间信息,您可以实现一个时间戳提取器和水印分配器,有效地将水印控制权让给另一个流。像这样:
// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our config stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.
public static class ConfigStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return Watermark.MAX_WATERMARK;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
return 0;
}
}
我正在使用如下管道:
inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
processBroadcastElement(...){...}
processElement(...){...}
}).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())
在 CustomTrigger()
中,我正在注册一个 eventTimeTimer()
,它将触发以指示我的 window 结束。问题是 onEventTime()
方法 从未 被调用,即使在:
- 我已经确保
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- 使用
ascendingTimestampExtractor()
,我发送了一个事件,该事件肯定已将水印推得足够远,以至于eventTimeTimer()
应该触发。
我错过了什么?是不是跟水印丢失和KeyedBroadcastProcessFunction
的onTimer()
方法有关?我怀疑是因为 David Anderson 在
add special fake watermarks for the non-broadcast stream (set to Watermark.MAX_WATERMARK)
我还没有实现名为 onTimer 的方法。但是,如果确实如此,我不明白这与下游触发器有什么关系。谢谢。
编辑:此场景的完整示例是 here。
是的,问题是广播流没有水印。 (但是不,KeyedBroadcastProcessFunction
是否有 onTimer 方法并不重要。一旦你让水印流动,它们就会流到 window 无论如何。)
只要一个运算符有两个或多个输入——所以在你的情况下,当 inputStream
和 configurationBroadcastStream
连接时——该运算符的水印将是其水印中的最小值输入。由于广播流没有水印,这阻止了 inputStream
.
我有一个 example 显示您可能如何处理此问题。假设您的广播流不需要任何时间信息,您可以实现一个时间戳提取器和水印分配器,有效地将水印控制权让给另一个流。像这样:
// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our config stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.
public static class ConfigStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return Watermark.MAX_WATERMARK;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
return 0;
}
}