数据流作业不产生任何输出
Dataflow job does not any produce output
我遇到一个问题,数据流作业实际上运行良好,但在手动排空作业之前不会产生任何输出。
使用以下代码,我假设它会产生 windowed 输出,在每个 window.
之后有效触发
lines = (
p
| "read" >> source
| "decode" >> beam.Map(decode_message)
| "Parse" >> beam.Map(parse_json)
| beam.WindowInto(
beam.window.FixedWindows(5*60),
trigger=beam.trigger.Repeatedly(beam.trigger.AfterProcessingTime(5*60)),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
| "write" >> sink
)
我想要的是,如果它在 window 中接收到事件,它应该在任何情况下在 window 之后产生输出。来源是 Cloud PubSub,大约有 100 events/minute.
这是我用来启动作业的参数:
python main.py \
--region $(REGION) \
--service_account_email $(SERVICE_ACCOUNT_EMAIL_TEST) \
--staging_location gs://$(BUCKET_NAME_TEST)/beam_stage/ \
--project $(TEST_PROJECT_ID) \
--inputTopic $(TOPIC_TEST) \
--outputLocation gs://$(BUCKET_NAME_TEST)/beam_output/ \
--streaming \
--runner DataflowRunner \
--temp_location gs://$(BUCKET_NAME_TEST)/beam_temp/ \
--experiments=allow_non_updatable_job \
--disk_size_gb=200 \
--machine_type=n1-standard-2 \
--job_name $(DATAFLOW_JOB_NAME)
关于如何解决这个问题有什么想法吗?我正在使用 apache-beam 2.22 SDK,python 3.7
请问您指的是2.22,因为"apache-beam 1.22"好像有点老了?特别是当您使用 Python 3.7 时,您可能想尝试更新的 SDK 版本,例如 2.22.0.
What I want is that if it has received events in a window, it should produce output after the window in any case. The source is a Cloud PubSub, with approximately 100 events/minute.
如果您只需要每 window 触发一个窗格并每 5 分钟固定 windows,您可以简单地使用
beam.WindowInto(beam.window.FixedWindows(5*60))
如果你想自定义触发器,可以看看这个文档streaming-102。
这是一个流式传输示例,其中包含 windowed 输出的可视化。
from apache_beam.runners.interactive import interactive_beam as ib
ib.options.capture_duration = timedelta(seconds=30)
ib.evict_captured_data()
pstreaming = beam.Pipeline(InteractiveRunner(), options=options)
words = (pstreaming
| 'Read' >> beam.io.ReadFromPubSub(topic=topic)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5)))
ib.show(words, visualize_data=True, include_window_info=True)
如果您 运行 在 jupyterlab 等笔记本环境中编写这些代码,则可以使用 this 等输出调试流式传输管道。注意 windows 是可视化的,在 30 秒的时间内,我们得到 6 windows,因为固定的 window 设置为 5 秒。您可以按 windows 对数据进行分类,以查看哪些数据来自 window.
您可以设置自己的笔记本 运行时间 instructions;
或者您可以使用 Google Dataflow Notebooks.
提供的托管解决方案
我遇到一个问题,数据流作业实际上运行良好,但在手动排空作业之前不会产生任何输出。
使用以下代码,我假设它会产生 windowed 输出,在每个 window.
之后有效触发 lines = (
p
| "read" >> source
| "decode" >> beam.Map(decode_message)
| "Parse" >> beam.Map(parse_json)
| beam.WindowInto(
beam.window.FixedWindows(5*60),
trigger=beam.trigger.Repeatedly(beam.trigger.AfterProcessingTime(5*60)),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
| "write" >> sink
)
我想要的是,如果它在 window 中接收到事件,它应该在任何情况下在 window 之后产生输出。来源是 Cloud PubSub,大约有 100 events/minute.
这是我用来启动作业的参数:
python main.py \
--region $(REGION) \
--service_account_email $(SERVICE_ACCOUNT_EMAIL_TEST) \
--staging_location gs://$(BUCKET_NAME_TEST)/beam_stage/ \
--project $(TEST_PROJECT_ID) \
--inputTopic $(TOPIC_TEST) \
--outputLocation gs://$(BUCKET_NAME_TEST)/beam_output/ \
--streaming \
--runner DataflowRunner \
--temp_location gs://$(BUCKET_NAME_TEST)/beam_temp/ \
--experiments=allow_non_updatable_job \
--disk_size_gb=200 \
--machine_type=n1-standard-2 \
--job_name $(DATAFLOW_JOB_NAME)
关于如何解决这个问题有什么想法吗?我正在使用 apache-beam 2.22 SDK,python 3.7
请问您指的是2.22,因为"apache-beam 1.22"好像有点老了?特别是当您使用 Python 3.7 时,您可能想尝试更新的 SDK 版本,例如 2.22.0.
What I want is that if it has received events in a window, it should produce output after the window in any case. The source is a Cloud PubSub, with approximately 100 events/minute.
如果您只需要每 window 触发一个窗格并每 5 分钟固定 windows,您可以简单地使用
beam.WindowInto(beam.window.FixedWindows(5*60))
如果你想自定义触发器,可以看看这个文档streaming-102。
这是一个流式传输示例,其中包含 windowed 输出的可视化。
from apache_beam.runners.interactive import interactive_beam as ib
ib.options.capture_duration = timedelta(seconds=30)
ib.evict_captured_data()
pstreaming = beam.Pipeline(InteractiveRunner(), options=options)
words = (pstreaming
| 'Read' >> beam.io.ReadFromPubSub(topic=topic)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5)))
ib.show(words, visualize_data=True, include_window_info=True)
如果您 运行 在 jupyterlab 等笔记本环境中编写这些代码,则可以使用 this 等输出调试流式传输管道。注意 windows 是可视化的,在 30 秒的时间内,我们得到 6 windows,因为固定的 window 设置为 5 秒。您可以按 windows 对数据进行分类,以查看哪些数据来自 window.
您可以设置自己的笔记本 运行时间 instructions; 或者您可以使用 Google Dataflow Notebooks.
提供的托管解决方案