使用 Apache Beam Python `WriteToFiles` 转换,每个 window 只写一个文件

Write just one file per window with Apache Beam Python `WriteToFiles` transform

需要一些帮助。我有一些琐碎的任务从 Pub/Sub 读取并写入 GCS 中的批处理文件,但在 fileio.WriteToFiles

中遇到了一些困难
with beam.Pipeline(options=pipeline_options) as p:
  input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
             | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
             | 'Parse' >> beam.Map(parse_json)
             | ' data w' >> beam.WindowInto(
                 FixedWindows(60),
                 accumulation_mode=AccumulationMode.DISCARDING
             ))

  event_data = (input
             | 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
             | 'encode et' >> beam.Map(lambda x: json.dumps(x))
             | 'write events to file' >> fileio.WriteToFiles(
                    path='gs://extention/ga_analytics/events/', shards=0))

我的 window 触发后我需要一个文件,但是文件的数量等于来自 Pubsub 的消息数量,有人可以帮我吗? current output files 但我只需要一个文件。

您可以传递 WriteToFiles(shards=1, ...) 以将每个 window 和触发的输出限制为单个分片。

我最近 运行 研究了这个问题并深入研究了源代码:

fileio.WriteToFiles 将尝试将包中的每个元素作为单独的文件输出。如果元素数量超过编写器数量,WireToFiles 只会回退到写入分片文件。

要强制为 window 中的所有元素创建单个文件,请将 max_writers_per_bundle 设置为 0

WriteToFiles(shards=1, max_writers_per_bundle=0)