如何在 python 中(单元)测试 apache-beam 中的流管道?

How to (unit) test a stream pipeline in apache-beam in python?

我写了一些流媒体管道(从 Pub/Sub 开始),我想给它添加一些窗口机制。我现在想以某种适当的方式对其进行测试,那么如何创建一些 "dummy" 流?

我的代码:

pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=pipeline_options, runner=DirectRunner())
xmls_beam = beam.Create(xmls)
x = p | xmls_beam | beam.FlatMap(process_xmls) | beam.ParDo(FilterTI()) | beam.WindowInto(window.FixedWindows(200)) | beam.GroupByKey()
result = p.run()
result.wait_until_finish()

您可以使用 TimestampedValue 的 PCollection 来模拟 "dummy stream"。

例如,如果您的输入是:

    l = [window.TimestampedValue('a', 100), window.TimestampedValue('b', 300)]
    pc = p | beam.Create(l) | ...

在你的情况下(固定 window 宽度 200)你可以预期输出元素 'a' 属于第一个 window 和 'b' 第二个。