通过相关管道处理 Dataflow/Apache Beam 中的拒绝
Handling rejects in Dataflow/Apache Beam through dependent pipelines
我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们纠正到 Bigquery table。我正在将拒绝收集到一个全局列表变量中,然后将该列表加载到 BigQuery table 中。当我 运行 它在本地时,这个过程工作正常,因为管道 运行 以正确的顺序排列。当我 运行 它使用数据流 运行ner 时,它不能保证顺序(我希望管道 1 到 运行 在管道 2 之前。有没有办法在数据流中使用 python? 或者如果可以用更好的方法解决这个问题,请提出建议。在此先感谢。
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(lambda x: somefunction) # Collecting rejects in the except block of this method to a global list variable
....etc
| 'to gcs' >> beam.io.WriteToText(output)
)
# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
rejects = (pipeline2
| 'create pipeline' >> beam.Create(reject_list)
| 'to json format' >> beam.Map(lambda data: {.....})
| 'to bq' >> beam.io.WriteToBigQuery(....)
)
你可以做类似的事情,但只有 1 个管道,并在转换中添加一些额外的代码。
beam.Map(lambda x: somefunction)
应该有两个输出:写入 GCS 的输出,以及最终将写入 BigQuery 的被拒绝元素。
为此,您的转换函数必须 return a TaggedOutput
.
Beam 编程指南中有一个示例:https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn
第二个 PCollection
,然后您可以写入 BigQuery。
您不需要在管道的第二个分支中有 Create
。
管道将是这样的:
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(transform) # Tagged output produced here
pcoll_to_gcs = data.gcs_output
pcoll_to_bq = data.rejected
pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....)
那么transform
函数就是这样的
def transform(element):
if something_is_wrong_with_element:
yield pvalue.TaggedOutput('rejected', element)
transformed_element = ....
yield pvalue.TaggedOutput('gcs_output', transformed_element)
我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们纠正到 Bigquery table。我正在将拒绝收集到一个全局列表变量中,然后将该列表加载到 BigQuery table 中。当我 运行 它在本地时,这个过程工作正常,因为管道 运行 以正确的顺序排列。当我 运行 它使用数据流 运行ner 时,它不能保证顺序(我希望管道 1 到 运行 在管道 2 之前。有没有办法在数据流中使用 python? 或者如果可以用更好的方法解决这个问题,请提出建议。在此先感谢。
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(lambda x: somefunction) # Collecting rejects in the except block of this method to a global list variable
....etc
| 'to gcs' >> beam.io.WriteToText(output)
)
# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
rejects = (pipeline2
| 'create pipeline' >> beam.Create(reject_list)
| 'to json format' >> beam.Map(lambda data: {.....})
| 'to bq' >> beam.io.WriteToBigQuery(....)
)
你可以做类似的事情,但只有 1 个管道,并在转换中添加一些额外的代码。
beam.Map(lambda x: somefunction)
应该有两个输出:写入 GCS 的输出,以及最终将写入 BigQuery 的被拒绝元素。
为此,您的转换函数必须 return a TaggedOutput
.
Beam 编程指南中有一个示例:https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn
第二个 PCollection
,然后您可以写入 BigQuery。
您不需要在管道的第二个分支中有 Create
。
管道将是这样的:
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(transform) # Tagged output produced here
pcoll_to_gcs = data.gcs_output
pcoll_to_bq = data.rejected
pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....)
那么transform
函数就是这样的
def transform(element):
if something_is_wrong_with_element:
yield pvalue.TaggedOutput('rejected', element)
transformed_element = ....
yield pvalue.TaggedOutput('gcs_output', transformed_element)