数据流作业不产生任何输出

Dataflow job does not any produce output

我遇到一个问题,数据流作业实际上运行良好,但在手动排空作业之前不会产生任何输出。

使用以下代码,我假设它会产生 windowed 输出,在每个 window.

之后有效触发
    lines = (
        p
        | "read" >> source
        | "decode" >> beam.Map(decode_message)
        | "Parse" >> beam.Map(parse_json)
        | beam.WindowInto(
               beam.window.FixedWindows(5*60),
               trigger=beam.trigger.Repeatedly(beam.trigger.AfterProcessingTime(5*60)),
               accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
        | "write" >> sink 
    )

我想要的是,如果它在 window 中接收到事件,它应该在任何情况下在 window 之后产生输出。来源是 Cloud PubSub,大约有 100 events/minute.

这是我用来启动作业的参数:

    python main.py \
        --region $(REGION) \
        --service_account_email $(SERVICE_ACCOUNT_EMAIL_TEST) \
        --staging_location gs://$(BUCKET_NAME_TEST)/beam_stage/ \
        --project $(TEST_PROJECT_ID) \
        --inputTopic $(TOPIC_TEST) \
        --outputLocation gs://$(BUCKET_NAME_TEST)/beam_output/ \
        --streaming \
        --runner DataflowRunner \
        --temp_location gs://$(BUCKET_NAME_TEST)/beam_temp/ \
        --experiments=allow_non_updatable_job \
        --disk_size_gb=200 \
        --machine_type=n1-standard-2 \
        --job_name $(DATAFLOW_JOB_NAME)

关于如何解决这个问题有什么想法吗?我正在使用 apache-beam 2.22 SDK,python 3.7

请问您指的是2.22,因为"apache-beam 1.22"好像有点老了?特别是当您使用 Python 3.7 时,您可能想尝试更新的 SDK 版本,例如 2.22.0.

What I want is that if it has received events in a window, it should produce output after the window in any case. The source is a Cloud PubSub, with approximately 100 events/minute.

如果您只需要每 window 触发一个窗格并每 5 分钟固定 windows,您可以简单地使用

beam.WindowInto(beam.window.FixedWindows(5*60))

如果你想自定义触发器,可以看看这个文档streaming-102

这是一个流式传输示例,其中包含 windowed 输出的可视化。

from apache_beam.runners.interactive import interactive_beam as ib

ib.options.capture_duration = timedelta(seconds=30)
ib.evict_captured_data()

pstreaming = beam.Pipeline(InteractiveRunner(), options=options)

words = (pstreaming
        | 'Read' >> beam.io.ReadFromPubSub(topic=topic)
        | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5)))


ib.show(words, visualize_data=True, include_window_info=True)

如果您 运行 在 jupyterlab 等笔记本环境中编写这些代码,则可以使用 this 等输出调试流式传输管道。注意 windows 是可视化的,在 30 秒的时间内,我们得到 6 windows,因为固定的 window 设置为 5 秒。您可以按 windows 对数据进行分类,以查看哪些数据来自 window.

您可以设置自己的笔记本 运行时间 instructions; 或者您可以使用 Google Dataflow Notebooks.

提供的托管解决方案