将 Pub/Sub 连接到数据流 Python 管道

Connect Pub/Sub to Dataflow Python pipeline

我正在编写处理电子邮件的 Dataflow 流式传输管道(in Python)。 这个想法是,当一封电子邮件到达时,将发布一条 Pub/Sub 消息,触发检索电子邮件并处理它的管道。 Pub/Sub 消息的内容没有用,因为我只是用它来触发管道。

我在最后一部分遇到了一些麻烦。我设法部署了管道并将其连接到 Pub/Sub 主题,但是当我尝试对其进行测试(发布消息)时,没有任何反应。

我想我必须设置一个 window 那 "collects" 消息并在某个时候发出它们,但我应该怎么做? 有没有办法说"start the pipeline everytime a new Pub/Sub message is received, ignoring its content"?

提前致谢!

你能分享更多关于你的管道和电子邮件存储位置的信息吗?

我建议您查看 Beam 中可用的一些示例管道。

如果您分享有关您的管道/代码的更多信息,我可以尝试与您一起迭代。

我终于设法解决了我的问题。问题是由于从我为此目的定义的 class 导入了自定义管道选项。此导入阻止了管道被触发。删除它我终于设法触发了管道。

对于那些可能需要它的人,有罪的进口是

from engine.user_options import UserOptions

导入的 class 是

import apache_beam as beam


class UserOptions(beam.options.pipeline_options.PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--env', type=str)