使用广播状态强制 Window 使用假消息关闭
Using Broadcast State To Force Window Closure Using Fake Messages
描述:
目前我正在研究将 Flink 与 IOT 设置结合使用。本质上,设备正在发送诸如 (device_id、device_type、event_timestamp 等数据,而我无法控制消息何时发送。然后我通过 device_id 和 device_type 键控蒸汽以预形成聚合。我想使用事件时间,因为它可以确保设置的计时器在发生故障时以确定性触发。但是,考虑到这并不总是高吞吐量流,window 可以打开 10 分钟的聚合时间段,但直到大约 40 分钟后才会出现下一个点。虽然计算最终会完成聚合,但它会非常晚地输出我想要的结果。
所以我的解决方法是创建一个额外的外部源,它除了发送虚假消息外什么都不做。通过让这些虚假消息与我的 10 分钟聚合周期保持一致,即使设备没有发送任何数据,事件时间 windows 也会强制 windows 关闭。这里的关键部分是让所有并行实例/运算符都可以访问这个假消息,因为我需要用这个假消息关闭所有 windows。我在想广播状态可能是实现这个目标最合适的方式:"Broadcast state is replicated across all parallel instances of a function, and might typically be used where you have two streams, a regular data stream alongside a control stream that serves rules, patterns, or other configuration messages." Quote Source
问题:
- 广播状态是确保所有并行实例(例如windows)收到我的假消息的最佳方法吗?
- 一旦运营商通过广播状态访问了这条假消息,那么这条假消息是否可以用于推进事件时间水印?
您可以按照您建议的方式使用广播状态进行这项工作,但我不认为这是最佳解决方案。
在理想情况下,我建议您安排设备偶尔发送保活消息,但假设这不可能,我认为自定义触发器在这里会很好用。您可以扩展 EventTimeTrigger,以便除了它通过
创建的事件时间计时器之外
ctx.registerEventTimeTimer(window.maxTimestamp());
您还创建了一个处理时间计时器,作为后备,如果在该处理时间计时器触发时 window 仍然存在,您将触发 window。
我推荐这种方法,因为它更简单,更直接地满足特定需求。使用广播状态方法,您必须为这些消息引入源,添加广播状态描述符和流,为非广播流添加特殊的假水印(设置为 Watermark.MAX_WATERMARK),连接广播和非-广播流并实现 BroadcastProcessFunction(可能实际上什么都不做)等。它有很多移动部件分布在几个不同的操作符上。
描述:
目前我正在研究将 Flink 与 IOT 设置结合使用。本质上,设备正在发送诸如 (device_id、device_type、event_timestamp 等数据,而我无法控制消息何时发送。然后我通过 device_id 和 device_type 键控蒸汽以预形成聚合。我想使用事件时间,因为它可以确保设置的计时器在发生故障时以确定性触发。但是,考虑到这并不总是高吞吐量流,window 可以打开 10 分钟的聚合时间段,但直到大约 40 分钟后才会出现下一个点。虽然计算最终会完成聚合,但它会非常晚地输出我想要的结果。
所以我的解决方法是创建一个额外的外部源,它除了发送虚假消息外什么都不做。通过让这些虚假消息与我的 10 分钟聚合周期保持一致,即使设备没有发送任何数据,事件时间 windows 也会强制 windows 关闭。这里的关键部分是让所有并行实例/运算符都可以访问这个假消息,因为我需要用这个假消息关闭所有 windows。我在想广播状态可能是实现这个目标最合适的方式:"Broadcast state is replicated across all parallel instances of a function, and might typically be used where you have two streams, a regular data stream alongside a control stream that serves rules, patterns, or other configuration messages." Quote Source
问题:
- 广播状态是确保所有并行实例(例如windows)收到我的假消息的最佳方法吗?
- 一旦运营商通过广播状态访问了这条假消息,那么这条假消息是否可以用于推进事件时间水印?
您可以按照您建议的方式使用广播状态进行这项工作,但我不认为这是最佳解决方案。
在理想情况下,我建议您安排设备偶尔发送保活消息,但假设这不可能,我认为自定义触发器在这里会很好用。您可以扩展 EventTimeTrigger,以便除了它通过
创建的事件时间计时器之外ctx.registerEventTimeTimer(window.maxTimestamp());
您还创建了一个处理时间计时器,作为后备,如果在该处理时间计时器触发时 window 仍然存在,您将触发 window。
我推荐这种方法,因为它更简单,更直接地满足特定需求。使用广播状态方法,您必须为这些消息引入源,添加广播状态描述符和流,为非广播流添加特殊的假水印(设置为 Watermark.MAX_WATERMARK),连接广播和非-广播流并实现 BroadcastProcessFunction(可能实际上什么都不做)等。它有很多移动部件分布在几个不同的操作符上。