生成 "Heartbeat" 类型事件以将事件时间向前推进
Generating "Heartbeat" Type Events to Push Event Time Forward
我正在构建一个 Flink 流式应用程序并且更愿意使用事件时间,因为它可以确保所有设置的计时器在历史数据发生故障或重播时确定性地触发。事件时间的问题是时间只有在事件发生时才会向前移动。我们的数据源(物理传感器)有时会产生非常少的数据,因此有时单个数据点可能会打开五分钟的聚合 window , 但下一个数据点是 20 分钟后,因此 window 关闭并很晚才发出输出记录。
我们建议的解决方案是使用 AWS lambda 函数,该函数计划每 X 分钟 运行 输出一个虚拟事件到 Flink 从中读取的 Kinesis 流中,从而强制生成水印使时间向前推进。
我担心的是,这仅在水印真正是全局的情况下才有效,这意味着单个心跳消息可能会导致创建一个水印,该水印会提前 Flink 应用程序中每个 operator/task 的事件时间使用源自此流的数据。文档让我相信 Flink 会并行化从源读取,其中每个并行读取运算符生成自己的水印,然后下游运算符,比如 window,采用它所看到的各种水印中的最小值。如果是这种情况,这对我来说似乎有问题,因为每个并行水印生成器都需要一个虚拟心跳事件,但我无法控制哪些节点从流中读取我的心跳消息。
所以,我的问题是,下游操作员究竟如何使用水印来提前事件时间,以及是否可以将单个虚拟消息添加到运动流中以在整个 Flink 应用程序中提前事件时间?
如果不是,我如何强制事件时间向前推进?
你是对的;这里有一个问题。 BoundedOutOfOrdernessTimestampExtractor
实现的标准周期性水印生成器依赖于查看具有较大时间戳的新事件以推进水印。
您可以通过多种方式解决此问题:
运行 任务中的源和水印分配器 运行 一个并行度(然后增加管道其余部分的并行度,如果你想).这样一条心跳消息就足够了。
广播心跳消息。这样每个并行实例都会收到它们并且它们都可以提升它们的水印。
而不是心跳消息,实现一个水印生成器,它使用处理时间计时器来人为地推进水印,尽管没有传入事件。有关示例,请参阅 https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java。
请注意,第三种方法不太理想,因为它与处理时间耦合,消除了纯事件时间方法的一些核心优势。
如果您使用心跳源,您需要为 returns MAX_WATERMARK 的其他(有时是空闲的)源实施水印生成器。否则来自该流的水印将阻碍整体水印。
此外,AWS Lambda 感觉有点矫枉过正。您可以实现一个简单的自定义 Flink 源来创建心跳事件。
我正在构建一个 Flink 流式应用程序并且更愿意使用事件时间,因为它可以确保所有设置的计时器在历史数据发生故障或重播时确定性地触发。事件时间的问题是时间只有在事件发生时才会向前移动。我们的数据源(物理传感器)有时会产生非常少的数据,因此有时单个数据点可能会打开五分钟的聚合 window , 但下一个数据点是 20 分钟后,因此 window 关闭并很晚才发出输出记录。
我们建议的解决方案是使用 AWS lambda 函数,该函数计划每 X 分钟 运行 输出一个虚拟事件到 Flink 从中读取的 Kinesis 流中,从而强制生成水印使时间向前推进。
我担心的是,这仅在水印真正是全局的情况下才有效,这意味着单个心跳消息可能会导致创建一个水印,该水印会提前 Flink 应用程序中每个 operator/task 的事件时间使用源自此流的数据。文档让我相信 Flink 会并行化从源读取,其中每个并行读取运算符生成自己的水印,然后下游运算符,比如 window,采用它所看到的各种水印中的最小值。如果是这种情况,这对我来说似乎有问题,因为每个并行水印生成器都需要一个虚拟心跳事件,但我无法控制哪些节点从流中读取我的心跳消息。
所以,我的问题是,下游操作员究竟如何使用水印来提前事件时间,以及是否可以将单个虚拟消息添加到运动流中以在整个 Flink 应用程序中提前事件时间?
如果不是,我如何强制事件时间向前推进?
你是对的;这里有一个问题。 BoundedOutOfOrdernessTimestampExtractor
实现的标准周期性水印生成器依赖于查看具有较大时间戳的新事件以推进水印。
您可以通过多种方式解决此问题:
运行 任务中的源和水印分配器 运行 一个并行度(然后增加管道其余部分的并行度,如果你想).这样一条心跳消息就足够了。
广播心跳消息。这样每个并行实例都会收到它们并且它们都可以提升它们的水印。
而不是心跳消息,实现一个水印生成器,它使用处理时间计时器来人为地推进水印,尽管没有传入事件。有关示例,请参阅 https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java。
请注意,第三种方法不太理想,因为它与处理时间耦合,消除了纯事件时间方法的一些核心优势。
如果您使用心跳源,您需要为 returns MAX_WATERMARK 的其他(有时是空闲的)源实施水印生成器。否则来自该流的水印将阻碍整体水印。
此外,AWS Lambda 感觉有点矫枉过正。您可以实现一个简单的自定义 Flink 源来创建心跳事件。