从自定义 DoFn 中产生 `finish_bundle` 的结果

yield results in `finish_bundle` from a custom DoFn

我的管道的一个步骤涉及从外部数据源获取,我想分块进行(顺序无关紧要)。我找不到任何类似的 class,所以我创建了以下内容:

class FixedSizeBatchSplitter(beam.DoFn):
  def __init__(self, size):
    self.size = size

  def start_bundle(self):
    self.current_batch = []

  def finish_bundle(self):
    if self.current_batch: 
      yield self.current_batch

  def process(self, element):
    self.current_batch.append(element)
    if len(self.current_batch) >= self.size:
      yield self.current_batch
      self.current_batch = []

但是,当我 运行 这个管道时,我得到一个 RuntimeError: Finish Bundle should only output WindowedValue type 错误:

with beam.Pipeline() as p:
  res = (p
         | beam.Create(range(10))
         | beam.ParDo(FixedSizeBatchSplitter(3))
        )

这是为什么?为什么我可以在 process 中产生输出但在 finish_bundle 中却不能?顺便说一下,如果我删除 finish_bundle 管道可以工作,但显然会丢弃剩菜。

一个DoFn可能正在处理来自多个不同windows的元素。当您在 process() 中时,"current window" 是明确的 - 它是正在处理的元素的 window。当您在 finish_bundle 中时,它是不明确的,您需要明确指定 window。您需要产生 yield WindowedValue(something, timestamp, [window]).

形式的内容

如果您的所有数据都在全局 window 中,那就更容易了:window 将只是 GlobalWindow()。如果您使用多个 windows,那么每个 window 需要 1 个缓冲区;捕获 process() 中的 window 以便添加到适当的缓冲区;并在 finish_bundle 中分别在 window.

中发出它们中的每一个