Google 数据流 CombineFn add_input 失败
Google Dataflow CombineFn add_input failure
使用 Google Dataflow 的 python SDK
我已覆盖 CombineFn 以进行多字段聚合而不是一次一个字段。根据文档,add_input 的接口是分组后的中间聚合和输入值。例如:
假设
id1, 100
id1, 200
id1, 300
id2, 100
id2, 300
id2, 400
我对 add_input 函数的第二次调用应该接收 (100,) 和 (200,) 作为当前聚合和要添加到累加器的新值。此功能在本地运行器场景中完美运行。
当我转到 google 数据流引擎时,我看到一组具有上述条件的日志,其中它完成了 id1 和 id2 的聚合。然后我突然看到另一个调用 add_input 的 intermediate_value 被分配了输入值 (200,) 并且 input_value 在之前的计算中被分配了聚合值。
当第一遍完成所有计算时,我无法理解 add_input 第二组调用的原因。任何见解都会有所帮助吗?
总结一下:
经进一步分析,似乎正在调用 add_input 并使用同一键的两组中间聚合。根据接口,在此过程中它应该是中间聚合 + 输入值。
On further analysis, It looks like add_input is being called with two sets of intermediate aggregates for the same key. As per the interface it should be intermediate aggregate + input value in this pass.
听起来您正在从 merge_accumulators
函数中调用 add_input
。
查看您的 CombineFn
代码会有所帮助。
更重要的是,您 CombineFn
步骤的最终输出是否正确?
这是我的一个管道中的 CombineFn
以防它有助于查看另一个示例。
class MergeDictCombineFn(beam.core.CombineFn):
"""
CombineFn function that merges all of the dictionaries from the previous step
{'a': 1}, {'a': 2, 'b':1}, {'c': 1} -> {'a': 3, 'b':1, 'c': 1}
"""
def _sum_up(self, elements, accumulator=None):
accumulator = accumulator or self.create_accumulator()
for obj in elements:
for k, v in obj.iteritems():
if k not in accumulator:
accumulator[k] = 0
accumulator[k] += v
return accumulator
def create_accumulator(self):
return {}
def add_input(self, accumulator, element, *args, **kwargs):
return self._sum_up(elements=[element], accumulator=accumulator)
def add_inputs(self, accumulator, elements, *args, **kwargs):
return self._sum_up(elements=elements, accumulator=accumulator)
def merge_accumulators(self, accumulators, *args, **kwargs):
return self._sum_up(elements=accumulators)
def extract_output(self, accumulator, *args, **kwargs):
return accumulator
使用 Google Dataflow 的 python SDK 我已覆盖 CombineFn 以进行多字段聚合而不是一次一个字段。根据文档,add_input 的接口是分组后的中间聚合和输入值。例如: 假设 id1, 100 id1, 200 id1, 300 id2, 100 id2, 300 id2, 400
我对 add_input 函数的第二次调用应该接收 (100,) 和 (200,) 作为当前聚合和要添加到累加器的新值。此功能在本地运行器场景中完美运行。
当我转到 google 数据流引擎时,我看到一组具有上述条件的日志,其中它完成了 id1 和 id2 的聚合。然后我突然看到另一个调用 add_input 的 intermediate_value 被分配了输入值 (200,) 并且 input_value 在之前的计算中被分配了聚合值。
当第一遍完成所有计算时,我无法理解 add_input 第二组调用的原因。任何见解都会有所帮助吗?
总结一下:
经进一步分析,似乎正在调用 add_input 并使用同一键的两组中间聚合。根据接口,在此过程中它应该是中间聚合 + 输入值。
On further analysis, It looks like add_input is being called with two sets of intermediate aggregates for the same key. As per the interface it should be intermediate aggregate + input value in this pass.
听起来您正在从 merge_accumulators
函数中调用 add_input
。
查看您的 CombineFn
代码会有所帮助。
更重要的是,您 CombineFn
步骤的最终输出是否正确?
这是我的一个管道中的 CombineFn
以防它有助于查看另一个示例。
class MergeDictCombineFn(beam.core.CombineFn):
"""
CombineFn function that merges all of the dictionaries from the previous step
{'a': 1}, {'a': 2, 'b':1}, {'c': 1} -> {'a': 3, 'b':1, 'c': 1}
"""
def _sum_up(self, elements, accumulator=None):
accumulator = accumulator or self.create_accumulator()
for obj in elements:
for k, v in obj.iteritems():
if k not in accumulator:
accumulator[k] = 0
accumulator[k] += v
return accumulator
def create_accumulator(self):
return {}
def add_input(self, accumulator, element, *args, **kwargs):
return self._sum_up(elements=[element], accumulator=accumulator)
def add_inputs(self, accumulator, elements, *args, **kwargs):
return self._sum_up(elements=elements, accumulator=accumulator)
def merge_accumulators(self, accumulators, *args, **kwargs):
return self._sum_up(elements=accumulators)
def extract_output(self, accumulator, *args, **kwargs):
return accumulator