Wait.On() 在 Apache Beam Python SDK 版本上
Wait.On() on Apache Beam Python SDK version
我在 Python 上使用 Apache Beam,想问一下 python SDK 上的 Apache Beam Java Wait.on()
的等价物是什么?
目前我遇到以下代码片段的问题
if len(output_pcoll) > 1:
merged = (tuple(output_pcoll) |
'MergePCollections1' >> beam.Flatten())
else:
merged = output_pcoll[0]
outlier_side_input = self.construct_outlier_side_input(merged)
(merged |
"RemoveOutlier" >>
beam.ParDo(utils.Remove_Outliers(),
beam.pvalue.AsDict(outlier_side_input)) |
"WriteToCSV" >>
beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
self.OUTPUT), num_shards=1))
Apache Beam 似乎没有等到 self.construct_outlier_side_input
上的代码执行完毕,并在下一个管道中执行 "RemoveOutlier" 时导致空侧输入。在Java版本中你可以使用Wait.On()
等待construct_outlier_side_input
完成执行,但是我在Python SDK中找不到等效的方法。
--编辑--
我要实现的目标与此 link 中的几乎相同,
https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task
您可以使用 Beam 的附加输出功能来执行此操作。
示例代码片段如下
results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings'] # indexing works as well
一旦你 运行 上面的代码片段,你就会得到多个 PCollection,例如 below,above 和 marked。然后,您可以使用辅助输入进一步过滤或加入结果
希望对您有所帮助。
更新
根据评论,我想提一下 Apache Beam 有能力在 ValueState
和 BagState
的帮助下进行状态处理。如果要求是通读 PCollection 然后根据是否存在先验值做出决定,那么可以通过 BagState
处理此类要求,如下所示:-
def process(self,
element,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):
# Do you processing here
key, value = element
# Read all the data from buffer1
all_values_in_buffer_1 = [x for x in buffer_1.read()]
if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
# clear the buffer data if required conditions are met.
buffer_1.clear()
# add the value to buffer 2
buffer_2.add(value)
if StatefulDoFn._all_condition_met():
# Clear the timer if certain condition met and you don't want to trigger
# the callback method.
watermark_timer.clear()
yield element
@on_timer(WATERMARK_TIMER)
def on_expiry_1(self,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
key=beam.DoFn.KeyParam,
buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
# Window and key parameters are really useful especially for debugging issues.
yield 'expired1'
我在 Python 上使用 Apache Beam,想问一下 python SDK 上的 Apache Beam Java Wait.on()
的等价物是什么?
目前我遇到以下代码片段的问题
if len(output_pcoll) > 1:
merged = (tuple(output_pcoll) |
'MergePCollections1' >> beam.Flatten())
else:
merged = output_pcoll[0]
outlier_side_input = self.construct_outlier_side_input(merged)
(merged |
"RemoveOutlier" >>
beam.ParDo(utils.Remove_Outliers(),
beam.pvalue.AsDict(outlier_side_input)) |
"WriteToCSV" >>
beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
self.OUTPUT), num_shards=1))
Apache Beam 似乎没有等到 self.construct_outlier_side_input
上的代码执行完毕,并在下一个管道中执行 "RemoveOutlier" 时导致空侧输入。在Java版本中你可以使用Wait.On()
等待construct_outlier_side_input
完成执行,但是我在Python SDK中找不到等效的方法。
--编辑-- 我要实现的目标与此 link 中的几乎相同, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task
您可以使用 Beam 的附加输出功能来执行此操作。
示例代码片段如下
results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings'] # indexing works as well
一旦你 运行 上面的代码片段,你就会得到多个 PCollection,例如 below,above 和 marked。然后,您可以使用辅助输入进一步过滤或加入结果
希望对您有所帮助。
更新
根据评论,我想提一下 Apache Beam 有能力在 ValueState
和 BagState
的帮助下进行状态处理。如果要求是通读 PCollection 然后根据是否存在先验值做出决定,那么可以通过 BagState
处理此类要求,如下所示:-
def process(self,
element,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):
# Do you processing here
key, value = element
# Read all the data from buffer1
all_values_in_buffer_1 = [x for x in buffer_1.read()]
if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
# clear the buffer data if required conditions are met.
buffer_1.clear()
# add the value to buffer 2
buffer_2.add(value)
if StatefulDoFn._all_condition_met():
# Clear the timer if certain condition met and you don't want to trigger
# the callback method.
watermark_timer.clear()
yield element
@on_timer(WATERMARK_TIMER)
def on_expiry_1(self,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
key=beam.DoFn.KeyParam,
buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
# Window and key parameters are really useful especially for debugging issues.
yield 'expired1'