Google 数据流 - Apache Beam GroupByKey():Duplicating/Slow
Google Dataflow - Apache Beam GroupByKey(): Duplicating/Slow
我遇到了 beam.GroupByKey() 的情况,我加载了一个行数为 42.854 的文件。
由于业务规则,我需要执行一个GroupByKey();然而,在完成它的执行后,我注意到我得到了几乎两倍的行数。如下所示:
GroupByKey() 之前的步骤:
为什么我会有这种行为?
我没有在我的管道中做任何特别的事情:
with beam.Pipeline(runner, options=opts) as p:
#LOAD FILE
elements = p | 'Load File' >> beam.Create(fileRNS.values)
#PREPARE VALUES (BULK INSERT)
Script_Values = elements | 'Prepare Bulk Insert' >> beam.ParDo(Prepare_Bulk_Insert())
Grouped_Values = Script_Values | 'Grouping values' >> beam.GroupByKey()
#BULK INSERT INTO POSTGRESQL
Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(ExecuteInsert)
2021-02-09
当我调试时,Prepare_Bulk_Insert() 有以下内容:
如您所见,元素的数量是正确的,我不明白为什么 GroupByKey() 在我发送正确数量的情况下输入的元素数量更多。
Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(funcaoMap) 输入如下:
双倍金额。 =(
亲切的问候,
朱利安诺·梅代罗斯
这些屏幕截图表明“准备批量插入”DoFn 正在为每个输入元素输出一个以上的元素。您的第一个屏幕截图显示了 GBK 的输入 PCollection(由 DoFn 生成),第二个屏幕截图是 DoFn 的输入,因此差异必须由 DoFn 生成。
我遇到了 beam.GroupByKey() 的情况,我加载了一个行数为 42.854 的文件。
由于业务规则,我需要执行一个GroupByKey();然而,在完成它的执行后,我注意到我得到了几乎两倍的行数。如下所示:
GroupByKey() 之前的步骤:
为什么我会有这种行为?
我没有在我的管道中做任何特别的事情:
with beam.Pipeline(runner, options=opts) as p:
#LOAD FILE
elements = p | 'Load File' >> beam.Create(fileRNS.values)
#PREPARE VALUES (BULK INSERT)
Script_Values = elements | 'Prepare Bulk Insert' >> beam.ParDo(Prepare_Bulk_Insert())
Grouped_Values = Script_Values | 'Grouping values' >> beam.GroupByKey()
#BULK INSERT INTO POSTGRESQL
Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(ExecuteInsert)
2021-02-09
当我调试时,Prepare_Bulk_Insert() 有以下内容:
如您所见,元素的数量是正确的,我不明白为什么 GroupByKey() 在我发送正确数量的情况下输入的元素数量更多。
Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(funcaoMap) 输入如下:
双倍金额。 =(
亲切的问候, 朱利安诺·梅代罗斯
这些屏幕截图表明“准备批量插入”DoFn 正在为每个输入元素输出一个以上的元素。您的第一个屏幕截图显示了 GBK 的输入 PCollection(由 DoFn 生成),第二个屏幕截图是 DoFn 的输入,因此差异必须由 DoFn 生成。