如何使用 Python 处理 Dataflow 管道中的 BigQuery 插入错误?
How to handle BigQuery insert errors in a Dataflow pipeline using Python?
我正在尝试使用 Dataflow 创建流式管道,从 PubSub 主题读取消息,最终将它们写入 BigQuery table。我不想使用任何数据流模板。
目前我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以执行从 Pubsub 到达的每条消息的加载和转换过程(解析它包含的记录并添加一个新字段)以最终将结果写入 BigQuery table.
简化一下,我的代码是:
#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1,
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta
def load_pubsub(message):
try:
data = json.loads(message)
records = data["messages"]
return records
except:
raise ImportError("Something went wrong reading data from the Pub/Sub topic")
class ParseTransformPubSub(beam.DoFn):
def __init__(self):
self.water_mark = (datetime.now() + timedelta(hours = 1)).strftime("%Y-%m-%d %H:%M:%S.%f")
def process(self, records):
for record in records:
record["E"] = self.water_mark
yield record
def main():
table_schema = apache_beam.io.gcp.bigquery.parse_table_schema_from_json(open("TableSchema.json"))
parser = argparse.ArgumentParser()
parser.add_argument('--input_topic')
parser.add_argument('--output_table')
known_args, pipeline_args = parser.parse_known_args(sys.argv)
with beam.Pipeline(argv = pipeline_args) as p:
pipe = ( p | 'ReadDataFromPubSub' >> beam.io.ReadStringsFromPubSub(known_args.input_topic)
| 'LoadJSON' >> beam.Map(load_pubsub)
| 'ParseTransform' >> beam.ParDo(ParseTransformPubSub())
| 'WriteToAvailabilityTable' >> beam.io.WriteToBigQuery(
table = known_args.output_table,
schema = table_schema,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
(例如)发布在 PubSub 主题中的消息使用如下:
'{"messages":[{"A":"Alpha", "B":"V1", "C":3, "D":12},{"A":"Alpha", "B":"V1", "C":5, "D":14},{"A":"Alpha", "B":"V1", "C":3, "D":22}]}'
如果记录中添加字段"E",那么记录的结构(Python中的字典)和字段的数据类型就是BigQuerytable期待。
我想处理的问题是:
如果某些消息带有意外结构,我想分叉管道展平并将它们写入另一个 BigQuery table。
如果某些消息带有意外数据类型的字段,那么在管道的最后一级当它们应该写入table时将发生错误。我想通过将记录转移到第三个 table.
来管理此类错误
我阅读了以下页面上的文档,但一无所获:
https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
https://cloud.google.com/dataflow/docs/guides/common-errors
顺便说一下,如果我选择通过从 PubSubSubscription 读取并写入 BigQuery 的模板来配置管道的选项,我会得到以下架构,结果与我正在寻找的架构相同:
Template: Cloud Pub/Sub Subscription to BigQuery
您无法捕捉到 BigQuery 的接收器中发生的错误。你写到bigquery的消息一定是好的。
最好的模式是执行一个检查消息结构和字段类型的转换。如果出现错误,您创建一个错误流程并将此问题流程写入一个文件(例如,或者在没有架构的 table 中,您以纯文本形式写入您的消息)
我正在尝试使用 Dataflow 创建流式管道,从 PubSub 主题读取消息,最终将它们写入 BigQuery table。我不想使用任何数据流模板。
目前我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以执行从 Pubsub 到达的每条消息的加载和转换过程(解析它包含的记录并添加一个新字段)以最终将结果写入 BigQuery table.
简化一下,我的代码是:
#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1,
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta
def load_pubsub(message):
try:
data = json.loads(message)
records = data["messages"]
return records
except:
raise ImportError("Something went wrong reading data from the Pub/Sub topic")
class ParseTransformPubSub(beam.DoFn):
def __init__(self):
self.water_mark = (datetime.now() + timedelta(hours = 1)).strftime("%Y-%m-%d %H:%M:%S.%f")
def process(self, records):
for record in records:
record["E"] = self.water_mark
yield record
def main():
table_schema = apache_beam.io.gcp.bigquery.parse_table_schema_from_json(open("TableSchema.json"))
parser = argparse.ArgumentParser()
parser.add_argument('--input_topic')
parser.add_argument('--output_table')
known_args, pipeline_args = parser.parse_known_args(sys.argv)
with beam.Pipeline(argv = pipeline_args) as p:
pipe = ( p | 'ReadDataFromPubSub' >> beam.io.ReadStringsFromPubSub(known_args.input_topic)
| 'LoadJSON' >> beam.Map(load_pubsub)
| 'ParseTransform' >> beam.ParDo(ParseTransformPubSub())
| 'WriteToAvailabilityTable' >> beam.io.WriteToBigQuery(
table = known_args.output_table,
schema = table_schema,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
(例如)发布在 PubSub 主题中的消息使用如下:
'{"messages":[{"A":"Alpha", "B":"V1", "C":3, "D":12},{"A":"Alpha", "B":"V1", "C":5, "D":14},{"A":"Alpha", "B":"V1", "C":3, "D":22}]}'
如果记录中添加字段"E",那么记录的结构(Python中的字典)和字段的数据类型就是BigQuerytable期待。
我想处理的问题是:
如果某些消息带有意外结构,我想分叉管道展平并将它们写入另一个 BigQuery table。
如果某些消息带有意外数据类型的字段,那么在管道的最后一级当它们应该写入table时将发生错误。我想通过将记录转移到第三个 table.
来管理此类错误
我阅读了以下页面上的文档,但一无所获: https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline https://cloud.google.com/dataflow/docs/guides/common-errors
顺便说一下,如果我选择通过从 PubSubSubscription 读取并写入 BigQuery 的模板来配置管道的选项,我会得到以下架构,结果与我正在寻找的架构相同:
Template: Cloud Pub/Sub Subscription to BigQuery
您无法捕捉到 BigQuery 的接收器中发生的错误。你写到bigquery的消息一定是好的。
最好的模式是执行一个检查消息结构和字段类型的转换。如果出现错误,您创建一个错误流程并将此问题流程写入一个文件(例如,或者在没有架构的 table 中,您以纯文本形式写入您的消息)