从自定义 DoFn 中产生 `finish_bundle` 的结果
yield results in `finish_bundle` from a custom DoFn
我的管道的一个步骤涉及从外部数据源获取,我想分块进行(顺序无关紧要)。我找不到任何类似的 class,所以我创建了以下内容:
class FixedSizeBatchSplitter(beam.DoFn):
def __init__(self, size):
self.size = size
def start_bundle(self):
self.current_batch = []
def finish_bundle(self):
if self.current_batch:
yield self.current_batch
def process(self, element):
self.current_batch.append(element)
if len(self.current_batch) >= self.size:
yield self.current_batch
self.current_batch = []
但是,当我 运行 这个管道时,我得到一个 RuntimeError: Finish Bundle should only output WindowedValue type
错误:
with beam.Pipeline() as p:
res = (p
| beam.Create(range(10))
| beam.ParDo(FixedSizeBatchSplitter(3))
)
这是为什么?为什么我可以在 process
中产生输出但在 finish_bundle
中却不能?顺便说一下,如果我删除 finish_bundle
管道可以工作,但显然会丢弃剩菜。
一个DoFn
可能正在处理来自多个不同windows的元素。当您在 process()
中时,"current window" 是明确的 - 它是正在处理的元素的 window。当您在 finish_bundle
中时,它是不明确的,您需要明确指定 window。您需要产生 yield WindowedValue(something, timestamp, [window])
.
形式的内容
如果您的所有数据都在全局 window 中,那就更容易了:window
将只是 GlobalWindow()
。如果您使用多个 windows,那么每个 window 需要 1 个缓冲区;捕获 process()
中的 window 以便添加到适当的缓冲区;并在 finish_bundle
中分别在 window.
中发出它们中的每一个
我的管道的一个步骤涉及从外部数据源获取,我想分块进行(顺序无关紧要)。我找不到任何类似的 class,所以我创建了以下内容:
class FixedSizeBatchSplitter(beam.DoFn):
def __init__(self, size):
self.size = size
def start_bundle(self):
self.current_batch = []
def finish_bundle(self):
if self.current_batch:
yield self.current_batch
def process(self, element):
self.current_batch.append(element)
if len(self.current_batch) >= self.size:
yield self.current_batch
self.current_batch = []
但是,当我 运行 这个管道时,我得到一个 RuntimeError: Finish Bundle should only output WindowedValue type
错误:
with beam.Pipeline() as p:
res = (p
| beam.Create(range(10))
| beam.ParDo(FixedSizeBatchSplitter(3))
)
这是为什么?为什么我可以在 process
中产生输出但在 finish_bundle
中却不能?顺便说一下,如果我删除 finish_bundle
管道可以工作,但显然会丢弃剩菜。
一个DoFn
可能正在处理来自多个不同windows的元素。当您在 process()
中时,"current window" 是明确的 - 它是正在处理的元素的 window。当您在 finish_bundle
中时,它是不明确的,您需要明确指定 window。您需要产生 yield WindowedValue(something, timestamp, [window])
.
如果您的所有数据都在全局 window 中,那就更容易了:window
将只是 GlobalWindow()
。如果您使用多个 windows,那么每个 window 需要 1 个缓冲区;捕获 process()
中的 window 以便添加到适当的缓冲区;并在 finish_bundle
中分别在 window.