GCP 数据流 Apache Beam 代码逻辑未按预期工作
GCP Dataflow Apache Beam code logic not working as expected
我正在尝试在 Apache Beam 中实施 CDC,部署在 Google Cloud Dataflow 中。
我已经卸载了主数据和新数据,预计每天都会有。
联接未按预期工作。缺了点什么。
master_data = (
p | 'Read base from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=master_data, use_standard_sql=True))
| 'Map id in master' >> beam.Map(lambda master: (
master['id'], master)))
new_data = (
p | 'Read Delta from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=new_data, use_standard_sql=True))
| 'Map id in new' >> beam.Map(lambda new: (new['id'], new)))
joined_dicts = (
{'master_data' :master_data, 'new_data' : new_data }
| beam.CoGroupByKey()
| beam.FlatMap(join_lists)
| 'mergeddicts' >> beam.Map(lambda masterdict, newdict: newdict.update(masterdict))
)
def join_lists(k,v):
itertools.product(v['master_data'], v['new_data'])
观察(样本数据):
来自主控的数据
1, 'A',3232
2, 'B',234
新数据:
1,'A' ,44
4,'D',45
master中的预期结果table,post代码实现:
1, 'A',44
2, 'B',234
4,'D',45
然而,我在 master table 中得到的是:
1,'A' ,44
4,'D',45
我是不是漏掉了一步?谁能帮我改正错误。
您不需要在分组依据后展平,因为它会再次分隔元素。
这里是示例代码。
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
def join_lists(e):
(k,v)=e
return (k, v['new_data']) if v['new_data'] != v['master_data'] else (k, None)
with beam.Pipeline(options=PipelineOptions()) as p:
master_data = (
p | 'Read base from BigQuery ' >> beam.Create([('A', [3232]),('B', [234])])
)
new_data = (
p | 'Read Delta from BigQuery ' >> beam.Create([('A',[44]),('D',[45])])
)
joined_dicts = (
{'master_data' :master_data, 'new_data' : new_data }
| beam.CoGroupByKey()
| 'mergeddicts' >> beam.Map(join_lists)
)
result = p.run()
result.wait_until_finish()
print("Pipeline finished.")
我正在尝试在 Apache Beam 中实施 CDC,部署在 Google Cloud Dataflow 中。
我已经卸载了主数据和新数据,预计每天都会有。 联接未按预期工作。缺了点什么。
master_data = (
p | 'Read base from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=master_data, use_standard_sql=True))
| 'Map id in master' >> beam.Map(lambda master: (
master['id'], master)))
new_data = (
p | 'Read Delta from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=new_data, use_standard_sql=True))
| 'Map id in new' >> beam.Map(lambda new: (new['id'], new)))
joined_dicts = (
{'master_data' :master_data, 'new_data' : new_data }
| beam.CoGroupByKey()
| beam.FlatMap(join_lists)
| 'mergeddicts' >> beam.Map(lambda masterdict, newdict: newdict.update(masterdict))
)
def join_lists(k,v):
itertools.product(v['master_data'], v['new_data'])
观察(样本数据):
来自主控的数据
1, 'A',3232
2, 'B',234
新数据:
1,'A' ,44
4,'D',45
master中的预期结果table,post代码实现:
1, 'A',44
2, 'B',234
4,'D',45
然而,我在 master table 中得到的是:
1,'A' ,44
4,'D',45
我是不是漏掉了一步?谁能帮我改正错误。
您不需要在分组依据后展平,因为它会再次分隔元素。
这里是示例代码。
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
def join_lists(e):
(k,v)=e
return (k, v['new_data']) if v['new_data'] != v['master_data'] else (k, None)
with beam.Pipeline(options=PipelineOptions()) as p:
master_data = (
p | 'Read base from BigQuery ' >> beam.Create([('A', [3232]),('B', [234])])
)
new_data = (
p | 'Read Delta from BigQuery ' >> beam.Create([('A',[44]),('D',[45])])
)
joined_dicts = (
{'master_data' :master_data, 'new_data' : new_data }
| beam.CoGroupByKey()
| 'mergeddicts' >> beam.Map(join_lists)
)
result = p.run()
result.wait_until_finish()
print("Pipeline finished.")