如何在 apache-beam 中等待 1 小时缓冲区然后推送日期?
How to wait with 1h buffer in apache-beam and then push the date?
我正在收集有关航班的信息。飞行的最大长度为 10 小时。我大约每 1 分钟获取一次跟踪信息。在 apache beam 中处理期间,事件顺序被打乱。合并所有数据后,我想将其推送到 BigQuery 并丢弃数据,这样它就不会消耗内存。
我有 2 个策略可以做到这一点:
1) 等待1小时,如果没有新数据推送到BQ
2) 每 15 分钟 运行 我自己的算法验证数据是否完整。
我想选择 1),因为它更简单。我的代码可以正确吗?:
models = (xmls | beam.FlatMap(process_xmls))
tracking_informations = models | beam.ParDo(FilterTI())
grouped_tis = tracking_informations | beam.WindowInto(window.FixedWindows(10 * 3600), trigger=AfterProcessingTime(1 * 3600), accumulation_mode=AccumulationMode.DISCARDING) | beam.GroupByKey() | "push and merge to BQ"
在阅读了您的用例和所需的方法(将属于同一航班的所有事件组合在一起,直到您发现不活动的间隙)之后,这似乎非常适合 Session windows。在示例中,您应该使用航班标识符(f1
、f2
等作为键)并指定 1 小时的间隔。如果在此期间没有观察到新事件,会话将终止。
您可以将它们与 beam.WindowInto(window.Sessions(session_gap))
一起使用,您可以找到一个完整的示例 (不要忘记添加 Group By Key 步骤以在单个会话中实际合并事件) .
我正在收集有关航班的信息。飞行的最大长度为 10 小时。我大约每 1 分钟获取一次跟踪信息。在 apache beam 中处理期间,事件顺序被打乱。合并所有数据后,我想将其推送到 BigQuery 并丢弃数据,这样它就不会消耗内存。
我有 2 个策略可以做到这一点:
1) 等待1小时,如果没有新数据推送到BQ
2) 每 15 分钟 运行 我自己的算法验证数据是否完整。
我想选择 1),因为它更简单。我的代码可以正确吗?:
models = (xmls | beam.FlatMap(process_xmls))
tracking_informations = models | beam.ParDo(FilterTI())
grouped_tis = tracking_informations | beam.WindowInto(window.FixedWindows(10 * 3600), trigger=AfterProcessingTime(1 * 3600), accumulation_mode=AccumulationMode.DISCARDING) | beam.GroupByKey() | "push and merge to BQ"
在阅读了您的用例和所需的方法(将属于同一航班的所有事件组合在一起,直到您发现不活动的间隙)之后,这似乎非常适合 Session windows。在示例中,您应该使用航班标识符(f1
、f2
等作为键)并指定 1 小时的间隔。如果在此期间没有观察到新事件,会话将终止。
您可以将它们与 beam.WindowInto(window.Sessions(session_gap))
一起使用,您可以找到一个完整的示例