聚合步骤在本地发出结果,但在 GCP 数据流中部署时不会发出结果

Aggregation step emitting results locally but not when deployed in GCP Dataflow

我们有一个从 Pub/Sub GCP 主题读取数据的管道。我们想根据所述数据创建 15 分钟的聚合(积分和均值)。为此,我们创建了一个 FixedWindow,后跟一个 creation/groupby/deletion 的虚拟键,这样我们就可以将 window 中的所有消息都放在一个列表中,然后使用 DoFn 自定义 class 使用 pandas 来执行处理。最后我们将结果写入 InfluxDB 数据库。

(
    p
    | 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=PUBSUB_TOPIC)
    | 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
    | 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
    | "add_dummy_key" >> beam.Map(lambda elem: (None, elem))
    | "groupby_dummy_key" >> beam.GroupByKey()
    | "delete_dummy_key" >> beam.MapTuple(lambda _, val: val)
    | "aggregate" >> beam.ParDo(Aggregator())
    | "write_processed_messages_to_influx" >> beam.ParDo(WriteToInfluxDB())
)

这是 class 聚合包含 15 分钟内所有消息的列表 window:

class Aggregator(beam.DoFn):
    def process(self, elements):
        # parsing the message list into a pandas DataFrame
        # some preprocessing and agregation steps
        # returns a list with json messages
        return [aggregated_values]

我们使用 GCP Pub/Sub 模拟器在本地测试这段代码,它工作得很好。但是,当我们部署到 GCP Dataflow 时,它不会发出任何结果,也不会在日志中发现任何错误。此外,我们看到数据新鲜度无限增长。

我们认为我们缺少一些触发功能,但我们不确定这是否是进行此类聚合的正确方法,因为它在本地发出结果,但在部署时不会发出结果。当我们使用与默认触发器不同的触发器时,本地我们没有发射。

我们已经尝试了一些触发选项(重复、AfterProcessingTime、AccumulationMode.DISCARDING、AfterWatermark)和另一种使用 Combine 自定义的方法 class,但我们也没有结果发射。

注意:对于更详细的代码,here 是聚合 class 的完整代码。

问题已通过使用 setup.py 文件而不是所需软件包的 requirements.txt 文件解决。此外,我们将 python Apache Beam SDK 版本从 2.29.0 更新为 2.32.0,现在可以正确聚合和发送结果。

我们在 Logging GCP 模块中发现了这个错误:

Error syncing pod ... ("pipeline_name") skipping: [failed to "StartContainer" for "sdk0" with CrashLoopBackOff: "back-off 5m0s restarting failed container=sdk0 pod=pipeline_name)"

然后,我们找到这个 answer. We use this snippet 来创建我们自己的 setup.py