聚合步骤在本地发出结果,但在 GCP 数据流中部署时不会发出结果
Aggregation step emitting results locally but not when deployed in GCP Dataflow
我们有一个从 Pub/Sub GCP 主题读取数据的管道。我们想根据所述数据创建 15 分钟的聚合(积分和均值)。为此,我们创建了一个 FixedWindow,后跟一个 creation/groupby/deletion 的虚拟键,这样我们就可以将 window 中的所有消息都放在一个列表中,然后使用 DoFn 自定义 class 使用 pandas 来执行处理。最后我们将结果写入 InfluxDB 数据库。
(
p
| 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=PUBSUB_TOPIC)
| 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
| 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
| "add_dummy_key" >> beam.Map(lambda elem: (None, elem))
| "groupby_dummy_key" >> beam.GroupByKey()
| "delete_dummy_key" >> beam.MapTuple(lambda _, val: val)
| "aggregate" >> beam.ParDo(Aggregator())
| "write_processed_messages_to_influx" >> beam.ParDo(WriteToInfluxDB())
)
这是 class 聚合包含 15 分钟内所有消息的列表 window:
class Aggregator(beam.DoFn):
def process(self, elements):
# parsing the message list into a pandas DataFrame
# some preprocessing and agregation steps
# returns a list with json messages
return [aggregated_values]
我们使用 GCP Pub/Sub 模拟器在本地测试这段代码,它工作得很好。但是,当我们部署到 GCP Dataflow 时,它不会发出任何结果,也不会在日志中发现任何错误。此外,我们看到数据新鲜度无限增长。
我们认为我们缺少一些触发功能,但我们不确定这是否是进行此类聚合的正确方法,因为它在本地发出结果,但在部署时不会发出结果。当我们使用与默认触发器不同的触发器时,本地我们没有发射。
我们已经尝试了一些触发选项(重复、AfterProcessingTime、AccumulationMode.DISCARDING、AfterWatermark)和另一种使用 Combine 自定义的方法 class,但我们也没有结果发射。
注意:对于更详细的代码,here 是聚合 class 的完整代码。
问题已通过使用 setup.py
文件而不是所需软件包的 requirements.txt
文件解决。此外,我们将 python Apache Beam SDK 版本从 2.29.0
更新为 2.32.0
,现在可以正确聚合和发送结果。
我们在 Logging GCP 模块中发现了这个错误:
Error syncing pod ... ("pipeline_name") skipping: [failed to
"StartContainer" for "sdk0" with CrashLoopBackOff: "back-off 5m0s
restarting failed container=sdk0 pod=pipeline_name)"
然后,我们找到这个 answer. We use this snippet 来创建我们自己的 setup.py
。
我们有一个从 Pub/Sub GCP 主题读取数据的管道。我们想根据所述数据创建 15 分钟的聚合(积分和均值)。为此,我们创建了一个 FixedWindow,后跟一个 creation/groupby/deletion 的虚拟键,这样我们就可以将 window 中的所有消息都放在一个列表中,然后使用 DoFn 自定义 class 使用 pandas 来执行处理。最后我们将结果写入 InfluxDB 数据库。
(
p
| 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=PUBSUB_TOPIC)
| 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
| 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
| "add_dummy_key" >> beam.Map(lambda elem: (None, elem))
| "groupby_dummy_key" >> beam.GroupByKey()
| "delete_dummy_key" >> beam.MapTuple(lambda _, val: val)
| "aggregate" >> beam.ParDo(Aggregator())
| "write_processed_messages_to_influx" >> beam.ParDo(WriteToInfluxDB())
)
这是 class 聚合包含 15 分钟内所有消息的列表 window:
class Aggregator(beam.DoFn):
def process(self, elements):
# parsing the message list into a pandas DataFrame
# some preprocessing and agregation steps
# returns a list with json messages
return [aggregated_values]
我们使用 GCP Pub/Sub 模拟器在本地测试这段代码,它工作得很好。但是,当我们部署到 GCP Dataflow 时,它不会发出任何结果,也不会在日志中发现任何错误。此外,我们看到数据新鲜度无限增长。
我们认为我们缺少一些触发功能,但我们不确定这是否是进行此类聚合的正确方法,因为它在本地发出结果,但在部署时不会发出结果。当我们使用与默认触发器不同的触发器时,本地我们没有发射。
我们已经尝试了一些触发选项(重复、AfterProcessingTime、AccumulationMode.DISCARDING、AfterWatermark)和另一种使用 Combine 自定义的方法 class,但我们也没有结果发射。
注意:对于更详细的代码,here 是聚合 class 的完整代码。
问题已通过使用 setup.py
文件而不是所需软件包的 requirements.txt
文件解决。此外,我们将 python Apache Beam SDK 版本从 2.29.0
更新为 2.32.0
,现在可以正确聚合和发送结果。
我们在 Logging GCP 模块中发现了这个错误:
Error syncing pod ... ("pipeline_name") skipping: [failed to "StartContainer" for "sdk0" with CrashLoopBackOff: "back-off 5m0s restarting failed container=sdk0 pod=pipeline_name)"
然后,我们找到这个 answer. We use this snippet 来创建我们自己的 setup.py
。